Class Materialized<K,V,S extends StateStore>
- Type Parameters:
K
- type of record keyV
- type of record valueS
- type of state store (note: state stores always have key/value types<Bytes,byte[]>
StateStore
should be materialized.
You can either provide a custom StateStore
backend through one of the provided methods accepting a supplier
or use the default RocksDB backends by providing just a store name.
For example, you can read a topic as KTable
and force a state store materialization to access the content
via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Correct Usage When Providing Serde:
To configure both the name of the store and the Serde
for the key and value,
you should use the following pattern:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName")
.withKeySerde(keySerde)
.withValueSerde(valueSerde);
This ensures that the store name is retained while configuring the key and value serde.
Warning: If you use the (static
) with(Serde, Serde)
method after calling
as(String)
, the instance created by as(String)
will be replaced by a new
Materialized
instance, and any configuration set on the first instance (e.g., store name, logging settings)
will be lost.
For example, the following code is incorrect because it discards the configuration of the store name (calling static
) methods on an object/instance is an anti-pattern):
// This will not work as expected:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName")
.with(keySerde, valueSerde); // The store name "MyStoreName" is lost
Instead, use the proper pattern of chaining withKeySerde
and withValueSerde
.- See Also:
-
Nested Class Summary
Nested Classes -
Method Summary
Modifier and TypeMethodDescriptionstatic <K,
V, S extends StateStore>
Materialized<K, V, S> Materialize aStateStore
with the given name.static <K,
V, S extends StateStore>
Materialized<K, V, S> as
(DslStoreSuppliers storeSuppliers) Materialize aStateStore
with the givenDslStoreSuppliers
.static <K,
V> Materialized <K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> as
(KeyValueBytesStoreSupplier supplier) Materialize aKeyValueStore
using the providedKeyValueBytesStoreSupplier
.static <K,
V> Materialized <K, V, SessionStore<org.apache.kafka.common.utils.Bytes, byte[]>> as
(SessionBytesStoreSupplier supplier) Materialize aSessionStore
using the providedSessionBytesStoreSupplier
.static <K,
V> Materialized <K, V, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>> as
(WindowBytesStoreSupplier supplier) Materialize aWindowStore
using the providedWindowBytesStoreSupplier
.static <K,
V, S extends StateStore>
Materialized<K, V, S> Materialize aStateStore
with the provided key and valueSerde
s.Materialized
<K, V, S> Disable caching for the materializedStateStore
.Materialized
<K, V, S> Enable caching for the materializedStateStore
.Materialized
<K, V, S> withKeySerde
(Serde<K> keySerde) Set the keySerde the materializedStateStore
will use.Materialized
<K, V, S> Disable change logging for the materializedStateStore
.Materialized
<K, V, S> withLoggingEnabled
(Map<String, String> config) Indicates that a changelog should be created for the store.Materialized
<K, V, S> withRetention
(Duration retention) Configure retention period for window and session stores.Materialized
<K, V, S> withStoreType
(DslStoreSuppliers storeSuppliers) Set the type of the materializedStateStore
.Materialized
<K, V, S> withValueSerde
(Serde<V> valueSerde) Set the valueSerde the materializedStateStore
will use.
-
Method Details
-
as
Materialize aStateStore
with the givenDslStoreSuppliers
.- Type Parameters:
K
- key type of the storeV
- value type of the storeS
- type of theStateStore
- Parameters:
storeSuppliers
- the type of the state store- Returns:
- a new
Materialized
instance with the given storeName
-
as
Materialize aStateStore
with the given name.This method sets the name of the state store to be used during materialization. You can provide additional configurations like key and value
Serde
s usingwithKeySerde(Serde)
andwithValueSerde(Serde)
.Warning: Do not use
with(Serde, Serde)
after calling this method, as it creates a newMaterialized
instance, which discards the store name and any other configurations set earlier.Correct usage:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .withKeySerde(keySerde) .withValueSerde(valueSerde);
Incorrect usage (store name is lost):
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .with(keySerde, valueSerde); // Store name is lost
- Type Parameters:
K
- key type of the storeV
- value type of the storeS
- type of theStateStore
- Parameters:
storeName
- the name of the underlyingKTable
state store; valid characters are ASCII alphanumerics, '.', '_' and '-'.- Returns:
- a new
Materialized
instance with the given storeName
-
as
public static <K,V> Materialized<K,V, asWindowStore<org.apache.kafka.common.utils.Bytes, byte[]>> (WindowBytesStoreSupplier supplier) Materialize aWindowStore
using the providedWindowBytesStoreSupplier
. Important: Custom subclasses are allowed here, but they should respect the retention contract: Window stores are required to retain windows at least as long as (window size + window grace period). Stores constructed viaStores
already satisfy this contract.- Type Parameters:
K
- key type of the storeV
- value type of the store- Parameters:
supplier
- theWindowBytesStoreSupplier
used to materialize the store- Returns:
- a new
Materialized
instance with the given supplier
-
as
public static <K,V> Materialized<K,V, asSessionStore<org.apache.kafka.common.utils.Bytes, byte[]>> (SessionBytesStoreSupplier supplier) Materialize aSessionStore
using the providedSessionBytesStoreSupplier
. Important: Custom subclasses are allowed here, but they should respect the retention contract: Session stores are required to retain windows at least as long as (session inactivity gap + session grace period). Stores constructed viaStores
already satisfy this contract.- Type Parameters:
K
- key type of the storeV
- value type of the store- Parameters:
supplier
- theSessionBytesStoreSupplier
used to materialize the store- Returns:
- a new
Materialized
instance with the given supplier
-
as
public static <K,V> Materialized<K,V, asKeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> (KeyValueBytesStoreSupplier supplier) Materialize aKeyValueStore
using the providedKeyValueBytesStoreSupplier
.- Type Parameters:
K
- key type of the storeV
- value type of the store- Parameters:
supplier
- theKeyValueBytesStoreSupplier
used to materialize the store- Returns:
- a new
Materialized
instance with the given supplier
-
with
public static <K,V, Materialized<K,S extends StateStore> V, withS> (Serde<K> keySerde, Serde<V> valueSerde) Materialize aStateStore
with the provided key and valueSerde
s.Note: If this method is used after
as(String)
, the originalMaterialized
instance will be replaced with a new instance, and any configuration on the first instance (e.g., store name) will be lost. To configure both a store name and key/value serde, usewithKeySerde(Serde)
andwithValueSerde(Serde)
instead.Correct usage with
Serde
:Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .withKeySerde(keySerde) .withValueSerde(valueSerde);
Incorrect usage (store name will be lost):
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .with(keySerde, valueSerde); // Store name is lost
- Type Parameters:
K
- key typeV
- value typeS
- store type- Parameters:
keySerde
- the keySerde
to use. If theSerde
is null, then the default key serde from configs will be usedvalueSerde
- the valueSerde
to use. If theSerde
is null, then the default value serde from configs will be used- Returns:
- a new
Materialized
instance with the given key and value serdes
-
withValueSerde
Set the valueSerde the materializedStateStore
will use. -
withKeySerde
Set the keySerde the materializedStateStore
will use. -
withLoggingEnabled
Indicates that a changelog should be created for the store. The changelog will be created with the provided configs.Note: Any unrecognized configs will be ignored.
- Parameters:
config
- any configs that should be applied to the changelog- Returns:
- itself
-
withLoggingDisabled
Disable change logging for the materializedStateStore
.- Returns:
- itself
-
withCachingEnabled
Enable caching for the materializedStateStore
.- Returns:
- itself
-
withCachingDisabled
Disable caching for the materializedStateStore
.- Returns:
- itself
-
withRetention
Configure retention period for window and session stores. Ignored for key/value stores. Overridden by pre-configured store suppliers (as(SessionBytesStoreSupplier)
oras(WindowBytesStoreSupplier)
). Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period. If not specified, the retention period would be set as the window length (from window-start through window-end) plus the grace period.- Parameters:
retention
- the retention time- Returns:
- itself
- Throws:
IllegalArgumentException
- if retention is negative or can't be represented aslong milliseconds
-
withStoreType
public Materialized<K,V, withStoreTypeS> (DslStoreSuppliers storeSuppliers) throws IllegalArgumentException Set the type of the materializedStateStore
.- Parameters:
storeSuppliers
- the store typeMaterialized.StoreType
to use.- Returns:
- itself
- Throws:
IllegalArgumentException
- if store supplier is also pre-configured
-