Class Materialized<K,V,S extends StateStore>
- Type Parameters:
K- type of record keyV- type of record valueS- type of state store (note: state stores always have key/value types<Bytes,byte[]>
StateStore should be materialized.
You can either provide a custom StateStore backend through one of the provided methods accepting a supplier
or use the default RocksDB backends by providing just a store name.
For example, you can read a topic as KTable and force a state store materialization to access the content
via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Correct Usage When Providing Serde:
To configure both the name of the store and the Serde for the key and value,
you should use the following pattern:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName")
.withKeySerde(keySerde)
.withValueSerde(valueSerde);
This ensures that the store name is retained while configuring the key and value serde.
Warning: If you use the (static) with(Serde, Serde) method after calling
as(String), the instance created by as(String) will be replaced by a new
Materialized instance, and any configuration set on the first instance (e.g., store name, logging settings)
will be lost.
For example, the following code is incorrect because it discards the configuration of the store name (calling static) methods on an object/instance is an anti-pattern):
// This will not work as expected:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName")
.with(keySerde, valueSerde); // The store name "MyStoreName" is lost
Instead, use the proper pattern of chaining withKeySerde and withValueSerde.- See Also:
-
Nested Class Summary
Nested Classes -
Method Summary
Modifier and TypeMethodDescriptionstatic <K,V, S extends StateStore>
Materialized<K,V, S> Materialize aStateStorewith the given name.static <K,V, S extends StateStore>
Materialized<K,V, S> as(DslStoreSuppliers storeSuppliers) Materialize aStateStorewith the givenDslStoreSuppliers.static <K,V> Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> as(KeyValueBytesStoreSupplier supplier) Materialize aKeyValueStoreusing the providedKeyValueBytesStoreSupplier.static <K,V> Materialized<K, V, SessionStore<org.apache.kafka.common.utils.Bytes, byte[]>> as(SessionBytesStoreSupplier supplier) Materialize aSessionStoreusing the providedSessionBytesStoreSupplier.static <K,V> Materialized<K, V, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>> as(WindowBytesStoreSupplier supplier) Materialize aWindowStoreusing the providedWindowBytesStoreSupplier.static <K,V, S extends StateStore>
Materialized<K,V, S> Materialize aStateStorewith the provided key and valueSerdes.Materialized<K,V, S> Disable caching for the materializedStateStore.Materialized<K,V, S> Enable caching for the materializedStateStore.Materialized<K,V, S> withKeySerde(Serde<K> keySerde) Set the keySerde the materializedStateStorewill use.Materialized<K,V, S> Disable change logging for the materializedStateStore.Materialized<K,V, S> withLoggingEnabled(Map<String, String> config) Indicates that a changelog should be created for the store.Materialized<K,V, S> withRetention(Duration retention) Configure retention period for window and session stores.Materialized<K,V, S> withStoreType(DslStoreSuppliers storeSuppliers) Set the type of the materializedStateStore.Materialized<K,V, S> withValueSerde(Serde<V> valueSerde) Set the valueSerde the materializedStateStorewill use.
-
Method Details
-
as
Materialize aStateStorewith the givenDslStoreSuppliers.- Type Parameters:
K- key type of the storeV- value type of the storeS- type of theStateStore- Parameters:
storeSuppliers- the type of the state store- Returns:
- a new
Materializedinstance with the given storeName
-
as
Materialize aStateStorewith the given name.This method sets the name of the state store to be used during materialization. You can provide additional configurations like key and value
Serdes usingwithKeySerde(Serde)andwithValueSerde(Serde).Warning: Do not use
with(Serde, Serde)after calling this method, as it creates a newMaterializedinstance, which discards the store name and any other configurations set earlier.Correct usage:
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .withKeySerde(keySerde) .withValueSerde(valueSerde);Incorrect usage (store name is lost):
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .with(keySerde, valueSerde); // Store name is lost- Type Parameters:
K- key type of the storeV- value type of the storeS- type of theStateStore- Parameters:
storeName- the name of the underlyingKTablestate store; valid characters are ASCII alphanumerics, '.', '_' and '-'.- Returns:
- a new
Materializedinstance with the given storeName
-
as
public static <K,V> Materialized<K,V, asWindowStore<org.apache.kafka.common.utils.Bytes, byte[]>> (WindowBytesStoreSupplier supplier) Materialize aWindowStoreusing the providedWindowBytesStoreSupplier. Important: Custom subclasses are allowed here, but they should respect the retention contract: Window stores are required to retain windows at least as long as (window size + window grace period). Stores constructed viaStoresalready satisfy this contract.- Type Parameters:
K- key type of the storeV- value type of the store- Parameters:
supplier- theWindowBytesStoreSupplierused to materialize the store- Returns:
- a new
Materializedinstance with the given supplier
-
as
public static <K,V> Materialized<K,V, asSessionStore<org.apache.kafka.common.utils.Bytes, byte[]>> (SessionBytesStoreSupplier supplier) Materialize aSessionStoreusing the providedSessionBytesStoreSupplier. Important: Custom subclasses are allowed here, but they should respect the retention contract: Session stores are required to retain windows at least as long as (session inactivity gap + session grace period). Stores constructed viaStoresalready satisfy this contract.- Type Parameters:
K- key type of the storeV- value type of the store- Parameters:
supplier- theSessionBytesStoreSupplierused to materialize the store- Returns:
- a new
Materializedinstance with the given supplier
-
as
public static <K,V> Materialized<K,V, asKeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> (KeyValueBytesStoreSupplier supplier) Materialize aKeyValueStoreusing the providedKeyValueBytesStoreSupplier.- Type Parameters:
K- key type of the storeV- value type of the store- Parameters:
supplier- theKeyValueBytesStoreSupplierused to materialize the store- Returns:
- a new
Materializedinstance with the given supplier
-
with
public static <K,V, Materialized<K,S extends StateStore> V, withS> (Serde<K> keySerde, Serde<V> valueSerde) Materialize aStateStorewith the provided key and valueSerdes.Note: If this method is used after
as(String), the originalMaterializedinstance will be replaced with a new instance, and any configuration on the first instance (e.g., store name) will be lost. To configure both a store name and key/value serde, usewithKeySerde(Serde)andwithValueSerde(Serde)instead.Correct usage with
Serde:Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .withKeySerde(keySerde) .withValueSerde(valueSerde);Incorrect usage (store name will be lost):
Materialized.<KeyType, ValueType, StateStore>as("MyStoreName") .with(keySerde, valueSerde); // Store name is lost- Type Parameters:
K- key typeV- value typeS- store type- Parameters:
keySerde- the keySerdeto use. If theSerdeis null, then the default key serde from configs will be usedvalueSerde- the valueSerdeto use. If theSerdeis null, then the default value serde from configs will be used- Returns:
- a new
Materializedinstance with the given key and value serdes
-
withValueSerde
Set the valueSerde the materializedStateStorewill use. -
withKeySerde
Set the keySerde the materializedStateStorewill use. -
withLoggingEnabled
Indicates that a changelog should be created for the store. The changelog will be created with the provided configs.Note: Any unrecognized configs will be ignored.
- Parameters:
config- any configs that should be applied to the changelog- Returns:
- itself
-
withLoggingDisabled
Disable change logging for the materializedStateStore.- Returns:
- itself
-
withCachingEnabled
Enable caching for the materializedStateStore.- Returns:
- itself
-
withCachingDisabled
Disable caching for the materializedStateStore.- Returns:
- itself
-
withRetention
Configure retention period for window and session stores. Ignored for key/value stores. Overridden by pre-configured store suppliers (as(SessionBytesStoreSupplier)oras(WindowBytesStoreSupplier)). Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period. If not specified, the retention period would be set as the window length (from window-start through window-end) plus the grace period.- Parameters:
retention- the retention time- Returns:
- itself
- Throws:
IllegalArgumentException- if retention is negative or can't be represented aslong milliseconds
-
withStoreType
public Materialized<K,V, withStoreTypeS> (DslStoreSuppliers storeSuppliers) throws IllegalArgumentException Set the type of the materializedStateStore.- Parameters:
storeSuppliers- the store typeMaterialized.StoreTypeto use.- Returns:
- itself
- Throws:
IllegalArgumentException- if store supplier is also pre-configured
-