K
- type of record keyV
- type of record valueS
- type of state store (note: state stores always have key/value types <Bytes,byte[]>
public class Materialized<K,V,S extends StateStore> extends Object
StateStore
should be materialized.
You can either provide a custom StateStore
backend through one of the provided methods accepting a supplier
or use the default RocksDB backends by providing just a store name.
For example, you can read a topic as KTable
and force a state store materialization to access the content
via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Stores
Modifier and Type | Class and Description |
---|---|
static class |
Materialized.StoreType |
Modifier and Type | Field and Description |
---|---|
protected boolean |
cachingEnabled |
protected DslStoreSuppliers |
dslStoreSuppliers |
protected Serde<K> |
keySerde |
protected boolean |
loggingEnabled |
protected Duration |
retention |
protected String |
storeName |
protected StoreSupplier<S> |
storeSupplier |
protected Map<String,String> |
topicConfig |
protected Serde<V> |
valueSerde |
Modifier | Constructor and Description |
---|---|
protected |
Materialized(Materialized<K,V,S> materialized)
Copy constructor.
|
Modifier and Type | Method and Description |
---|---|
static <K,V,S extends StateStore> |
as(DslStoreSuppliers storeSuppliers)
Materialize a
StateStore with the given DslStoreSuppliers . |
static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(KeyValueBytesStoreSupplier supplier)
Materialize a
KeyValueStore using the provided KeyValueBytesStoreSupplier . |
static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(SessionBytesStoreSupplier supplier)
Materialize a
SessionStore using the provided SessionBytesStoreSupplier . |
static <K,V,S extends StateStore> |
as(String storeName)
Materialize a
StateStore with the given name. |
static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(WindowBytesStoreSupplier supplier)
Materialize a
WindowStore using the provided WindowBytesStoreSupplier . |
static <K,V,S extends StateStore> |
with(Serde<K> keySerde,
Serde<V> valueSerde)
Materialize a
StateStore with the provided key and value Serde s. |
Materialized<K,V,S> |
withCachingDisabled()
Disable caching for the materialized
StateStore . |
Materialized<K,V,S> |
withCachingEnabled()
Enable caching for the materialized
StateStore . |
Materialized<K,V,S> |
withKeySerde(Serde<K> keySerde)
Set the keySerde the materialize
StateStore will use. |
Materialized<K,V,S> |
withLoggingDisabled()
Disable change logging for the materialized
StateStore . |
Materialized<K,V,S> |
withLoggingEnabled(Map<String,String> config)
Indicates that a changelog should be created for the store.
|
Materialized<K,V,S> |
withRetention(Duration retention)
Configure retention period for window and session stores.
|
Materialized<K,V,S> |
withStoreType(DslStoreSuppliers storeSuppliers)
Set the type of the materialized
StateStore . |
Materialized<K,V,S> |
withValueSerde(Serde<V> valueSerde)
Set the valueSerde the materialized
StateStore will use. |
protected StoreSupplier<S extends StateStore> storeSupplier
protected String storeName
protected boolean loggingEnabled
protected boolean cachingEnabled
protected Duration retention
protected DslStoreSuppliers dslStoreSuppliers
protected Materialized(Materialized<K,V,S> materialized)
materialized
- the Materialized
instance to copy.public static <K,V,S extends StateStore> Materialized<K,V,S> as(DslStoreSuppliers storeSuppliers)
StateStore
with the given DslStoreSuppliers
.K
- key type of the storeV
- value type of the storeS
- type of the StateStore
storeSuppliers
- the type of the state storeMaterialized
instance with the given storeNamepublic static <K,V,S extends StateStore> Materialized<K,V,S> as(String storeName)
StateStore
with the given name.K
- key type of the storeV
- value type of the storeS
- type of the StateStore
storeName
- the name of the underlying KTable
state store; valid characters are ASCII
alphanumerics, '.', '_' and '-'.Materialized
instance with the given storeNamepublic static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(WindowBytesStoreSupplier supplier)
WindowStore
using the provided WindowBytesStoreSupplier
.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Window stores are required to retain windows at least as long as (window size + window grace period).
Stores constructed via Stores
already satisfy this contract.K
- key type of the storeV
- value type of the storesupplier
- the WindowBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given supplierpublic static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(SessionBytesStoreSupplier supplier)
SessionStore
using the provided SessionBytesStoreSupplier
.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
Stores constructed via Stores
already satisfy this contract.K
- key type of the storeV
- value type of the storesupplier
- the SessionBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given sup
plierpublic static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(KeyValueBytesStoreSupplier supplier)
KeyValueStore
using the provided KeyValueBytesStoreSupplier
.K
- key type of the storeV
- value type of the storesupplier
- the KeyValueBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given supplierpublic static <K,V,S extends StateStore> Materialized<K,V,S> with(Serde<K> keySerde, Serde<V> valueSerde)
StateStore
with the provided key and value Serde
s.
An internal name will be used for the store.K
- key typeV
- value typeS
- store typekeySerde
- the key Serde
to use. If the Serde
is null, then the default key
serde from configs will be usedvalueSerde
- the value Serde
to use. If the Serde
is null, then the default value
serde from configs will be usedMaterialized
instance with the given key and value serdespublic Materialized<K,V,S> withValueSerde(Serde<V> valueSerde)
StateStore
will use.public Materialized<K,V,S> withKeySerde(Serde<K> keySerde)
StateStore
will use.public Materialized<K,V,S> withLoggingEnabled(Map<String,String> config)
Note: Any unrecognized configs will be ignored.
config
- any configs that should be applied to the changelogpublic Materialized<K,V,S> withLoggingDisabled()
StateStore
.public Materialized<K,V,S> withCachingEnabled()
StateStore
.public Materialized<K,V,S> withCachingDisabled()
StateStore
.public Materialized<K,V,S> withRetention(Duration retention) throws IllegalArgumentException
as(SessionBytesStoreSupplier)
or as(WindowBytesStoreSupplier)
).
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
from window-start through window-end, and for the entire grace period. If not specified, the retention
period would be set as the window length (from window-start through window-end) plus the grace period.retention
- the retention timeIllegalArgumentException
- if retention is negative or can't be represented as long milliseconds
public Materialized<K,V,S> withStoreType(DslStoreSuppliers storeSuppliers) throws IllegalArgumentException
StateStore
.storeSuppliers
- the store type Materialized.StoreType
to use.IllegalArgumentException
- if store supplier is also pre-configured