Class Stores
When using the high-level DSL, i.e., StreamsBuilder
, users create
StoreSupplier
s that can be further customized via
Materialized
.
For example, a topic read as KTable
can be materialized into an
in-memory store with custom key/value serdes and caching disabled:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"topicName",
Materialized.<Long,String>as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
.withCachingDisabled());
When using the Processor API, i.e., Topology
, users create
StoreBuilder
s that can be attached to Processor
s.
For example, you can create a windowed
RocksDB store with custom
changelog topic configuration like:
Topology topology = new Topology();
topology.addProcessor("processorName", ...);
Map<String,String> topicConfig = new HashMap<>();
StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
.windowStoreBuilder(
Stores.persistentWindowStore("queryable-store-name", ...),
Serdes.Integer(),
Serdes.Long())
.withLoggingEnabled(topicConfig);
topology.addStateStore(storeBuilder, "processorName");
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic KeyValueBytesStoreSupplier
inMemoryKeyValueStore
(String name) Create an in-memoryKeyValueBytesStoreSupplier
.static SessionBytesStoreSupplier
inMemorySessionStore
(String name, Duration retentionPeriod) Create an in-memorySessionBytesStoreSupplier
.static WindowBytesStoreSupplier
inMemoryWindowStore
(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) Create an in-memoryWindowBytesStoreSupplier
.static <K,
V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder
(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aKeyValueStore
.static KeyValueBytesStoreSupplier
Create a LRU MapKeyValueBytesStoreSupplier
.static KeyValueBytesStoreSupplier
Create a persistentKeyValueBytesStoreSupplier
.static SessionBytesStoreSupplier
persistentSessionStore
(String name, Duration retentionPeriod) Create a persistentSessionBytesStoreSupplier
.static KeyValueBytesStoreSupplier
Create a persistentKeyValueBytesStoreSupplier
.static WindowBytesStoreSupplier
persistentTimestampedWindowStore
(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) Create a persistentWindowBytesStoreSupplier
.static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore
(String name, Duration historyRetention) Create a persistent versioned key-value storeVersionedBytesStoreSupplier
.static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore
(String name, Duration historyRetention, Duration segmentInterval) Create a persistent versioned key-value storeVersionedBytesStoreSupplier
.static WindowBytesStoreSupplier
persistentWindowStore
(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) Create a persistentWindowBytesStoreSupplier
.static <K,
V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder
(SessionBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aSessionStore
.static <K,
V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder
(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aTimestampedKeyValueStore
.static <K,
V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder
(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aTimestampedWindowStore
.static <K,
V> StoreBuilder<VersionedKeyValueStore<K, V>> versionedKeyValueStoreBuilder
(VersionedBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aVersionedKeyValueStore
.static <K,
V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder
(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aWindowStore
.
-
Constructor Details
-
Stores
public Stores()
-
-
Method Details
-
persistentKeyValueStore
Create a persistentKeyValueBytesStoreSupplier
.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
. If you want to create aTimestampedKeyValueStore
orVersionedKeyValueStore
you should usepersistentTimestampedKeyValueStore(String)
orpersistentVersionedKeyValueStore(String, Duration)
, respectively, to create a store supplier instead.- Parameters:
name
- name of the store (cannot benull
)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
that can be used to build a persistent key-value store
-
persistentTimestampedKeyValueStore
Create a persistentKeyValueBytesStoreSupplier
.This store supplier can be passed into a
timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
. If you want to create aKeyValueStore
or aVersionedKeyValueStore
you should usepersistentKeyValueStore(String)
orpersistentVersionedKeyValueStore(String, Duration)
, respectively, to create a store supplier instead.- Parameters:
name
- name of the store (cannot benull
)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
that can be used to build a persistent key-(timestamp/value) store
-
persistentVersionedKeyValueStore
public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name, Duration historyRetention) Create a persistent versioned key-value storeVersionedBytesStoreSupplier
.This store supplier can be passed into a
versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)
.Note that it is not safe to change the value of
historyRetention
between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.- Parameters:
name
- name of the store (cannot benull
)historyRetention
- length of time that old record versions are available for query (cannot be negative). If a timestamp bound provided toVersionedKeyValueStore.get(Object, long)
is older than this specified history retention, then the get operation will not return data. This parameter also determines the "grace period" after which out-of-order writes will no longer be accepted.- Returns:
- an instance of
VersionedBytesStoreSupplier
- Throws:
IllegalArgumentException
- ifhistoryRetention
can't be represented aslong milliseconds
-
persistentVersionedKeyValueStore
public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name, Duration historyRetention, Duration segmentInterval) Create a persistent versioned key-value storeVersionedBytesStoreSupplier
.This store supplier can be passed into a
versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)
.Note that it is not safe to change the value of
segmentInterval
between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store otherwise.- Parameters:
name
- name of the store (cannot benull
)historyRetention
- length of time that old record versions are available for query (cannot be negative). If a timestamp bound provided toVersionedKeyValueStore.get(Object, long)
is older than this specified history retention, then the get operation will not return data. This parameter also determines the "grace period" after which out-of-order writes will no longer be accepted.segmentInterval
- size of segments for storing old record versions (must be positive). Old record versions for the same key in a single segment are stored (updated and accessed) together. The only impact of this parameter is performance. If segments are large and a workload results in many record versions for the same key being collected in a single segment, performance may degrade as a result. On the other hand, historical reads (which access older segments) and out-of-order writes may slow down if there are too many segments.- Returns:
- an instance of
VersionedBytesStoreSupplier
- Throws:
IllegalArgumentException
- ifhistoryRetention
orsegmentInterval
can't be represented aslong milliseconds
-
inMemoryKeyValueStore
Create an in-memoryKeyValueBytesStoreSupplier
.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
ortimestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.- Parameters:
name
- name of the store (cannot benull
)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
than can be used to build an in-memory store
-
lruMap
Create a LRU MapKeyValueBytesStoreSupplier
.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
ortimestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.- Parameters:
name
- name of the store (cannot benull
)maxCacheSize
- maximum number of items in the LRU (cannot be negative)- Returns:
- an instance of a
KeyValueBytesStoreSupplier
that can be used to build an LRU Map based store - Throws:
IllegalArgumentException
- ifmaxCacheSize
is negative
-
persistentWindowStore
public static WindowBytesStoreSupplier persistentWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException Create a persistentWindowBytesStoreSupplier
.This store supplier can be passed into a
windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
. If you want to create aTimestampedWindowStore
you should usepersistentTimestampedWindowStore(String, Duration, Duration, boolean)
to create a store supplier instead.Note that it is not safe to change the value of
retentionPeriod
between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier
- Throws:
IllegalArgumentException
- ifretentionPeriod
orwindowSize
can't be represented aslong milliseconds
IllegalArgumentException
- ifretentionPeriod
is smaller thanwindowSize
-
persistentTimestampedWindowStore
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException Create a persistentWindowBytesStoreSupplier
.This store supplier can be passed into a
timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
. If you want to create aWindowStore
you should usepersistentWindowStore(String, Duration, Duration, boolean)
to create a store supplier instead.Note that it is not safe to change the value of
retentionPeriod
between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier
- Throws:
IllegalArgumentException
- ifretentionPeriod
orwindowSize
can't be represented aslong milliseconds
IllegalArgumentException
- ifretentionPeriod
is smaller thanwindowSize
-
inMemoryWindowStore
public static WindowBytesStoreSupplier inMemoryWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException Create an in-memoryWindowBytesStoreSupplier
.This store supplier can be passed into a
windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
ortimestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier
- Throws:
IllegalArgumentException
- ifretentionPeriod
orwindowSize
can't be represented aslong milliseconds
IllegalArgumentException
- ifretentionPeriod
is smaller thanwindowSize
-
persistentSessionStore
public static SessionBytesStoreSupplier persistentSessionStore(String name, Duration retentionPeriod) Create a persistentSessionBytesStoreSupplier
.Note that it is not safe to change the value of
retentionPeriod
between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)- Returns:
- an instance of a
SessionBytesStoreSupplier
-
inMemorySessionStore
Create an in-memorySessionBytesStoreSupplier
.- Parameters:
name
- name of the store (cannot benull
)retentionPeriod
- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)- Returns:
- an instance of a
SessionBytesStoreSupplier
-
keyValueStoreBuilder
public static <K,V> StoreBuilder<KeyValueStore<K,V>> keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aKeyValueStore
.The provided supplier should not be a supplier for
TimestampedKeyValueStores
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aKeyValueBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as delete- Returns:
- an instance of a
StoreBuilder
that can build aKeyValueStore
-
timestampedKeyValueStoreBuilder
public static <K,V> StoreBuilder<TimestampedKeyValueStore<K,V>> timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aTimestampedKeyValueStore
.The provided supplier should not be a supplier for
KeyValueStores
. For this case, passed in timestamps will be dropped and not stored in the key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aKeyValueBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as delete- Returns:
- an instance of a
StoreBuilder
that can build aKeyValueStore
-
versionedKeyValueStoreBuilder
public static <K,V> StoreBuilder<VersionedKeyValueStore<K,V>> versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aVersionedKeyValueStore
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aVersionedBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as a deletion- Returns:
- an instance of a
StoreBuilder
that can build aVersionedKeyValueStore
-
windowStoreBuilder
public static <K,V> StoreBuilder<WindowStore<K,V>> windowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aWindowStore
.The provided supplier should not be a supplier for
TimestampedWindowStores
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aWindowBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as delete- Returns:
- an instance of
StoreBuilder
than can build aWindowStore
-
timestampedWindowStoreBuilder
public static <K,V> StoreBuilder<TimestampedWindowStore<K,V>> timestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aTimestampedWindowStore
.The provided supplier should not be a supplier for
WindowStores
. For this case, passed in timestamps will be dropped and not stored in the window-store. On read, no valid timestamp but a dummy timestamp will be returned.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aWindowBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as delete- Returns:
- an instance of
StoreBuilder
that can build aTimestampedWindowStore
-
sessionStoreBuilder
public static <K,V> StoreBuilder<SessionStore<K,V>> sessionStoreBuilder(SessionBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde) Creates aStoreBuilder
that can be used to build aSessionStore
.- Type Parameters:
K
- key typeV
- value type- Parameters:
supplier
- aSessionBytesStoreSupplier
(cannot benull
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes isnull
for put operations, it is treated as delete- Returns:
- an instance of
StoreBuilder
than can build aSessionStore
-