K - type of record keyV - type of record valueS - type of state store (note: state stores always have key/value types <Bytes,byte[]>public class Materialized<K,V,S extends StateStore> extends Object
StateStore should be materialized.
You can either provide a custom StateStore backend through one of the provided methods accepting a supplier
or use the default RocksDB backends by providing just a store name.
For example, you can read a topic as KTable and force a state store materialization to access the content
via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Stores| Modifier and Type | Class and Description |
|---|---|
static class |
Materialized.StoreType |
| Modifier and Type | Field and Description |
|---|---|
protected boolean |
cachingEnabled |
protected DslStoreSuppliers |
dslStoreSuppliers |
protected Serde<K> |
keySerde |
protected boolean |
loggingEnabled |
protected Duration |
retention |
protected String |
storeName |
protected StoreSupplier<S> |
storeSupplier |
protected Map<String,String> |
topicConfig |
protected Serde<V> |
valueSerde |
| Modifier | Constructor and Description |
|---|---|
protected |
Materialized(Materialized<K,V,S> materialized)
Copy constructor.
|
| Modifier and Type | Method and Description |
|---|---|
static <K,V,S extends StateStore> |
as(DslStoreSuppliers storeSuppliers)
Materialize a
StateStore with the given DslStoreSuppliers. |
static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(KeyValueBytesStoreSupplier supplier)
Materialize a
KeyValueStore using the provided KeyValueBytesStoreSupplier. |
static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(SessionBytesStoreSupplier supplier)
Materialize a
SessionStore using the provided SessionBytesStoreSupplier. |
static <K,V,S extends StateStore> |
as(String storeName)
Materialize a
StateStore with the given name. |
static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(WindowBytesStoreSupplier supplier)
Materialize a
WindowStore using the provided WindowBytesStoreSupplier. |
static <K,V,S extends StateStore> |
with(Serde<K> keySerde,
Serde<V> valueSerde)
Materialize a
StateStore with the provided key and value Serdes. |
Materialized<K,V,S> |
withCachingDisabled()
Disable caching for the materialized
StateStore. |
Materialized<K,V,S> |
withCachingEnabled()
Enable caching for the materialized
StateStore. |
Materialized<K,V,S> |
withKeySerde(Serde<K> keySerde)
Set the keySerde the materialize
StateStore will use. |
Materialized<K,V,S> |
withLoggingDisabled()
Disable change logging for the materialized
StateStore. |
Materialized<K,V,S> |
withLoggingEnabled(Map<String,String> config)
Indicates that a changelog should be created for the store.
|
Materialized<K,V,S> |
withRetention(Duration retention)
Configure retention period for window and session stores.
|
Materialized<K,V,S> |
withStoreType(DslStoreSuppliers storeSuppliers)
Set the type of the materialized
StateStore. |
Materialized<K,V,S> |
withValueSerde(Serde<V> valueSerde)
Set the valueSerde the materialized
StateStore will use. |
protected StoreSupplier<S extends StateStore> storeSupplier
protected String storeName
protected boolean loggingEnabled
protected boolean cachingEnabled
protected Duration retention
protected DslStoreSuppliers dslStoreSuppliers
protected Materialized(Materialized<K,V,S> materialized)
materialized - the Materialized instance to copy.public static <K,V,S extends StateStore> Materialized<K,V,S> as(DslStoreSuppliers storeSuppliers)
StateStore with the given DslStoreSuppliers.K - key type of the storeV - value type of the storeS - type of the StateStorestoreSuppliers - the type of the state storeMaterialized instance with the given storeNamepublic static <K,V,S extends StateStore> Materialized<K,V,S> as(String storeName)
StateStore with the given name.K - key type of the storeV - value type of the storeS - type of the StateStorestoreName - the name of the underlying KTable state store; valid characters are ASCII
alphanumerics, '.', '_' and '-'.Materialized instance with the given storeNamepublic static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(WindowBytesStoreSupplier supplier)
WindowStore using the provided WindowBytesStoreSupplier.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Window stores are required to retain windows at least as long as (window size + window grace period).
Stores constructed via Stores already satisfy this contract.K - key type of the storeV - value type of the storesupplier - the WindowBytesStoreSupplier used to materialize the storeMaterialized instance with the given supplierpublic static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(SessionBytesStoreSupplier supplier)
SessionStore using the provided SessionBytesStoreSupplier.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
Stores constructed via Stores already satisfy this contract.K - key type of the storeV - value type of the storesupplier - the SessionBytesStoreSupplier used to materialize the storeMaterialized instance with the given sup
plierpublic static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(KeyValueBytesStoreSupplier supplier)
KeyValueStore using the provided KeyValueBytesStoreSupplier.K - key type of the storeV - value type of the storesupplier - the KeyValueBytesStoreSupplier used to materialize the storeMaterialized instance with the given supplierpublic static <K,V,S extends StateStore> Materialized<K,V,S> with(Serde<K> keySerde, Serde<V> valueSerde)
StateStore with the provided key and value Serdes.
An internal name will be used for the store.K - key typeV - value typeS - store typekeySerde - the key Serde to use. If the Serde is null, then the default key
serde from configs will be usedvalueSerde - the value Serde to use. If the Serde is null, then the default value
serde from configs will be usedMaterialized instance with the given key and value serdespublic Materialized<K,V,S> withValueSerde(Serde<V> valueSerde)
StateStore will use.public Materialized<K,V,S> withKeySerde(Serde<K> keySerde)
StateStore will use.public Materialized<K,V,S> withLoggingEnabled(Map<String,String> config)
Note: Any unrecognized configs will be ignored.
config - any configs that should be applied to the changelogpublic Materialized<K,V,S> withLoggingDisabled()
StateStore.public Materialized<K,V,S> withCachingEnabled()
StateStore.public Materialized<K,V,S> withCachingDisabled()
StateStore.public Materialized<K,V,S> withRetention(Duration retention) throws IllegalArgumentException
as(SessionBytesStoreSupplier) or as(WindowBytesStoreSupplier)).
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
from window-start through window-end, and for the entire grace period. If not specified, the retention
period would be set as the window length (from window-start through window-end) plus the grace period.retention - the retention timeIllegalArgumentException - if retention is negative or can't be represented as long millisecondspublic Materialized<K,V,S> withStoreType(DslStoreSuppliers storeSuppliers) throws IllegalArgumentException
StateStore.storeSuppliers - the store type Materialized.StoreType to use.IllegalArgumentException - if store supplier is also pre-configured