Class Stores
- java.lang.Object
-
- org.apache.kafka.streams.state.Stores
-
public final class Stores extends Object
Factory for creating state stores in Kafka Streams.When using the high-level DSL, i.e.,
StreamsBuilder, users createStoreSuppliers that can be further customized viaMaterialized. For example, a topic read asKTablecan be materialized into an in-memory store with custom key/value serdes and caching disabled:
When using the Processor API, i.e.,StreamsBuilder builder = new StreamsBuilder(); KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name"); KTable<Long,String> table = builder.table( "topicName", Materialized.<Long,String>as(storeSupplier) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.String()) .withCachingDisabled());Topology, users createStoreBuilders that can be attached toProcessors. For example, you can create awindowedRocksDB store with custom changelog topic configuration like:Topology topology = new Topology(); topology.addProcessor("processorName", ...); Map<String,String> topicConfig = new HashMap<>(); StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores .windowStoreBuilder( Stores.persistentWindowStore("queryable-store-name", ...), Serdes.Integer(), Serdes.Long()) .withLoggingEnabled(topicConfig); topology.addStateStore(storeBuilder, "processorName");
-
-
Constructor Summary
Constructors Constructor Description Stores()
-
Method Summary
-
-
-
Method Detail
-
persistentKeyValueStore
public static KeyValueBytesStoreSupplier persistentKeyValueStore(String name)
Create a persistentKeyValueBytesStoreSupplier.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde). If you want to create aTimestampedKeyValueStoreyou should usepersistentTimestampedKeyValueStore(String)to create a store supplier instead.- Parameters:
name- name of the store (cannot benull)- Returns:
- an instance of a
KeyValueBytesStoreSupplierthat can be used to build a persistent key-value store
-
persistentTimestampedKeyValueStore
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(String name)
Create a persistentKeyValueBytesStoreSupplier.This store supplier can be passed into a
timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde). If you want to create aKeyValueStoreyou should usepersistentKeyValueStore(String)to create a store supplier instead.- Parameters:
name- name of the store (cannot benull)- Returns:
- an instance of a
KeyValueBytesStoreSupplierthat can be used to build a persistent key-(timestamp/value) store
-
inMemoryKeyValueStore
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(String name)
Create an in-memoryKeyValueBytesStoreSupplier.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)ortimestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde).- Parameters:
name- name of the store (cannot benull)- Returns:
- an instance of a
KeyValueBytesStoreSupplierthan can be used to build an in-memory store
-
lruMap
public static KeyValueBytesStoreSupplier lruMap(String name, int maxCacheSize)
Create a LRU MapKeyValueBytesStoreSupplier.This store supplier can be passed into a
keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)ortimestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde).- Parameters:
name- name of the store (cannot benull)maxCacheSize- maximum number of items in the LRU (cannot be negative)- Returns:
- an instance of a
KeyValueBytesStoreSupplierthat can be used to build an LRU Map based store - Throws:
IllegalArgumentException- ifmaxCacheSizeis negative
-
persistentWindowStore
@Deprecated public static WindowBytesStoreSupplier persistentWindowStore(String name, long retentionPeriod, int numSegments, long windowSize, boolean retainDuplicates)
Deprecated.since 2.1 UsepersistentWindowStore(String, Duration, Duration, boolean)insteadCreate a persistentWindowBytesStoreSupplier.- Parameters:
name- name of the store (cannot benull)retentionPeriod- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)numSegments- number of db segments (cannot be zero or negative)windowSize- size of the windows that are stored (cannot be negative). Note: the window size is not stored with the records, so this value is used to compute the keys that the store returns. No effort is made to validate this parameter, so you must be careful to set it the same as the windowed keys you're actually storing.retainDuplicates- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier
-
persistentWindowStore
public static WindowBytesStoreSupplier persistentWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
Create a persistentWindowBytesStoreSupplier.This store supplier can be passed into a
windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde). If you want to create aTimestampedWindowStoreyou should usepersistentTimestampedWindowStore(String, Duration, Duration, boolean)to create a store supplier instead.- Parameters:
name- name of the store (cannot benull)retentionPeriod- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize- size of the windows (cannot be negative)retainDuplicates- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier - Throws:
IllegalArgumentException- ifretentionPeriodorwindowSizecan't be represented aslong millisecondsIllegalArgumentException- ifretentionPeriodis smaller thanwindowSize
-
persistentTimestampedWindowStore
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
Create a persistentWindowBytesStoreSupplier.This store supplier can be passed into a
timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde). If you want to create aWindowStoreyou should usepersistentWindowStore(String, Duration, Duration, boolean)to create a store supplier instead.- Parameters:
name- name of the store (cannot benull)retentionPeriod- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize- size of the windows (cannot be negative)retainDuplicates- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier - Throws:
IllegalArgumentException- ifretentionPeriodorwindowSizecan't be represented aslong millisecondsIllegalArgumentException- ifretentionPeriodis smaller thanwindowSize
-
inMemoryWindowStore
public static WindowBytesStoreSupplier inMemoryWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
Create an in-memoryWindowBytesStoreSupplier.This store supplier can be passed into a
windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)ortimestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde).- Parameters:
name- name of the store (cannot benull)retentionPeriod- length of time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.windowSize- size of the windows (cannot be negative)retainDuplicates- whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.- Returns:
- an instance of
WindowBytesStoreSupplier - Throws:
IllegalArgumentException- ifretentionPeriodorwindowSizecan't be represented aslong millisecondsIllegalArgumentException- ifretentionPeriodis smaller thanwindowSize
-
persistentSessionStore
@Deprecated public static SessionBytesStoreSupplier persistentSessionStore(String name, long retentionPeriodMs)
Deprecated.since 2.1 UsepersistentSessionStore(String, Duration)insteadCreate a persistentSessionBytesStoreSupplier.- Parameters:
name- name of the store (cannot benull)retentionPeriodMs- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)- Returns:
- an instance of a
SessionBytesStoreSupplier
-
persistentSessionStore
public static SessionBytesStoreSupplier persistentSessionStore(String name, Duration retentionPeriod)
Create a persistentSessionBytesStoreSupplier.- Parameters:
name- name of the store (cannot benull)retentionPeriod- length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)- Returns:
- an instance of a
SessionBytesStoreSupplier
-
inMemorySessionStore
public static SessionBytesStoreSupplier inMemorySessionStore(String name, Duration retentionPeriod)
Create an in-memorySessionBytesStoreSupplier.- Parameters:
name- name of the store (cannot benull)retentionPeriod- length ot time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)- Returns:
- an instance of a
SessionBytesStoreSupplier
-
keyValueStoreBuilder
public static <K,V> StoreBuilder<KeyValueStore<K,V>> keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
Creates aStoreBuilderthat can be used to build aKeyValueStore.The provided supplier should not be a supplier for
TimestampedKeyValueStores.- Type Parameters:
K- key typeV- value type- Parameters:
supplier- aKeyValueBytesStoreSupplier(cannot benull)keySerde- the key serde to usevalueSerde- the value serde to use; if the serialized bytes isnullfor put operations, it is treated as delete- Returns:
- an instance of a
StoreBuilderthat can build aKeyValueStore
-
timestampedKeyValueStoreBuilder
public static <K,V> StoreBuilder<TimestampedKeyValueStore<K,V>> timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
Creates aStoreBuilderthat can be used to build aTimestampedKeyValueStore.The provided supplier should not be a supplier for
KeyValueStores. For this case, passed in timestamps will be dropped and not stored in the key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.- Type Parameters:
K- key typeV- value type- Parameters:
supplier- aKeyValueBytesStoreSupplier(cannot benull)keySerde- the key serde to usevalueSerde- the value serde to use; if the serialized bytes isnullfor put operations, it is treated as delete- Returns:
- an instance of a
StoreBuilderthat can build aKeyValueStore
-
windowStoreBuilder
public static <K,V> StoreBuilder<WindowStore<K,V>> windowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
Creates aStoreBuilderthat can be used to build aWindowStore.The provided supplier should not be a supplier for
TimestampedWindowStores.- Type Parameters:
K- key typeV- value type- Parameters:
supplier- aWindowBytesStoreSupplier(cannot benull)keySerde- the key serde to usevalueSerde- the value serde to use; if the serialized bytes isnullfor put operations, it is treated as delete- Returns:
- an instance of
StoreBuilderthan can build aWindowStore
-
timestampedWindowStoreBuilder
public static <K,V> StoreBuilder<TimestampedWindowStore<K,V>> timestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
Creates aStoreBuilderthat can be used to build aTimestampedWindowStore.The provided supplier should not be a supplier for
WindowStores. For this case, passed in timestamps will be dropped and not stored in the window-store. On read, no valid timestamp but a dummy timestamp will be returned.- Type Parameters:
K- key typeV- value type- Parameters:
supplier- aWindowBytesStoreSupplier(cannot benull)keySerde- the key serde to usevalueSerde- the value serde to use; if the serialized bytes isnullfor put operations, it is treated as delete- Returns:
- an instance of
StoreBuilderthat can build aTimestampedWindowStore
-
sessionStoreBuilder
public static <K,V> StoreBuilder<SessionStore<K,V>> sessionStoreBuilder(SessionBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
Creates aStoreBuilderthat can be used to build aSessionStore.- Type Parameters:
K- key typeV- value type- Parameters:
supplier- aSessionBytesStoreSupplier(cannot benull)keySerde- the key serde to usevalueSerde- the value serde to use; if the serialized bytes isnullfor put operations, it is treated as delete- Returns:
- an instance of
StoreBuilderthan can build aSessionStore
-
-