public final class Stores extends Object
When using the high-level DSL, i.e., StreamsBuilder
, users create
StoreSupplier
s that can be further customized via
Materialized
.
For example, a topic read as KTable
can be materialized into an
in-memory store with custom key/value serdes and caching disabled:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"topicName",
Materialized.<Long,String>as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
.withCachingDisabled());
When using the Processor API, i.e., Topology
, users create
StoreBuilder
s that can be attached to Processor
s.
For example, you can create a windowed
RocksDB store with custom
changelog topic configuration like:
Topology topology = new Topology();
topology.addProcessor("processorName", ...);
Map<String,String> topicConfig = new HashMap<>();
StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
.windowStoreBuilder(
Stores.persistentWindowStore("queryable-store-name", ...),
Serdes.Integer(),
Serdes.Long())
.withLoggingEnabled(topicConfig);
topology.addStateStore(storeBuilder, "processorName");
Constructor and Description |
---|
Stores() |
public static KeyValueBytesStoreSupplier persistentKeyValueStore(String name)
KeyValueBytesStoreSupplier
.
This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.
If you want to create a TimestampedKeyValueStore
you should use
persistentTimestampedKeyValueStore(String)
to create a store supplier instead.
name
- name of the store (cannot be null
)KeyValueBytesStoreSupplier
that can be used
to build a persistent key-value storepublic static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(String name)
KeyValueBytesStoreSupplier
.
This store supplier can be passed into a
timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.
If you want to create a KeyValueStore
you should use
persistentKeyValueStore(String)
to create a store supplier instead.
name
- name of the store (cannot be null
)KeyValueBytesStoreSupplier
that can be used
to build a persistent key-(timestamp/value) storepublic static KeyValueBytesStoreSupplier inMemoryKeyValueStore(String name)
KeyValueBytesStoreSupplier
.
This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
or timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.
name
- name of the store (cannot be null
)KeyValueBytesStoreSupplier
than can be used to
build an in-memory storepublic static KeyValueBytesStoreSupplier lruMap(String name, int maxCacheSize)
KeyValueBytesStoreSupplier
.
This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
or timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)
.
name
- name of the store (cannot be null
)maxCacheSize
- maximum number of items in the LRU (cannot be negative)KeyValueBytesStoreSupplier
that can be used to build
an LRU Map based storeIllegalArgumentException
- if maxCacheSize
is negative@Deprecated public static WindowBytesStoreSupplier persistentWindowStore(String name, long retentionPeriod, int numSegments, long windowSize, boolean retainDuplicates)
persistentWindowStore(String, Duration, Duration, boolean)
insteadWindowBytesStoreSupplier
.name
- name of the store (cannot be null
)retentionPeriod
- length of time to retain data in the store (cannot be negative)
(note that the retention period must be at least long enough to contain the
windowed data's entire life cycle, from window-start through window-end,
and for the entire grace period)numSegments
- number of db segments (cannot be zero or negative)windowSize
- size of the windows that are stored (cannot be negative). Note: the window size
is not stored with the records, so this value is used to compute the keys that
the store returns. No effort is made to validate this parameter, so you must be
careful to set it the same as the windowed keys you're actually storing.retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable
caching and means that null values will be ignored.WindowBytesStoreSupplier
public static WindowBytesStoreSupplier persistentWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier
.
This store supplier can be passed into a windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
.
If you want to create a TimestampedWindowStore
you should use
persistentTimestampedWindowStore(String, Duration, Duration, boolean)
to create a store supplier instead.
name
- name of the store (cannot be null
)retentionPeriod
- length of time to retain data in the store (cannot be negative)
(note that the retention period must be at least long enough to contain the
windowed data's entire life cycle, from window-start through window-end,
and for the entire grace period)windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable
caching and means that null values will be ignored.WindowBytesStoreSupplier
IllegalArgumentException
- if retentionPeriod
or windowSize
can't be represented as long milliseconds
IllegalArgumentException
- if retentionPeriod
is smaller than windowSize
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier
.
This store supplier can be passed into a
timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
.
If you want to create a WindowStore
you should use
persistentWindowStore(String, Duration, Duration, boolean)
to create a store supplier instead.
name
- name of the store (cannot be null
)retentionPeriod
- length of time to retain data in the store (cannot be negative)
(note that the retention period must be at least long enough to contain the
windowed data's entire life cycle, from window-start through window-end,
and for the entire grace period)windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable
caching and means that null values will be ignored.WindowBytesStoreSupplier
IllegalArgumentException
- if retentionPeriod
or windowSize
can't be represented as long milliseconds
IllegalArgumentException
- if retentionPeriod
is smaller than windowSize
public static WindowBytesStoreSupplier inMemoryWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier
.
This store supplier can be passed into a windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
or
timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)
.
name
- name of the store (cannot be null
)retentionPeriod
- length of time to retain data in the store (cannot be negative)
Note that the retention period must be at least long enough to contain the
windowed data's entire life cycle, from window-start through window-end,
and for the entire grace period.windowSize
- size of the windows (cannot be negative)retainDuplicates
- whether or not to retain duplicates. Turning this on will automatically disable
caching and means that null values will be ignored.WindowBytesStoreSupplier
IllegalArgumentException
- if retentionPeriod
or windowSize
can't be represented as long milliseconds
IllegalArgumentException
- if retentionPeriod
is smaller than windowSize
@Deprecated public static SessionBytesStoreSupplier persistentSessionStore(String name, long retentionPeriodMs)
persistentSessionStore(String, Duration)
insteadSessionBytesStoreSupplier
.name
- name of the store (cannot be null
)retentionPeriodMs
- length of time to retain data in the store (cannot be negative)
(note that the retention period must be at least as long enough to
contain the inactivity gap of the session and the entire grace period.)SessionBytesStoreSupplier
public static SessionBytesStoreSupplier persistentSessionStore(String name, Duration retentionPeriod)
SessionBytesStoreSupplier
.name
- name of the store (cannot be null
)retentionPeriod
- length of time to retain data in the store (cannot be negative)
(note that the retention period must be at least as long enough to
contain the inactivity gap of the session and the entire grace period.)SessionBytesStoreSupplier
public static SessionBytesStoreSupplier inMemorySessionStore(String name, Duration retentionPeriod)
SessionBytesStoreSupplier
.name
- name of the store (cannot be null
)retentionPeriod
- length ot time to retain data in the store (cannot be negative)
(note that the retention period must be at least as long enough to
contain the inactivity gap of the session and the entire grace period.)SessionBytesStoreSupplier
public static <K,V> StoreBuilder<KeyValueStore<K,V>> keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
StoreBuilder
that can be used to build a KeyValueStore
.
The provided supplier should not be a supplier for
TimestampedKeyValueStores
.
K
- key typeV
- value typesupplier
- a KeyValueBytesStoreSupplier
(cannot be null
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null
for put operations,
it is treated as deleteStoreBuilder
that can build a KeyValueStore
public static <K,V> StoreBuilder<TimestampedKeyValueStore<K,V>> timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
StoreBuilder
that can be used to build a TimestampedKeyValueStore
.
The provided supplier should not be a supplier for
KeyValueStores
. For this case, passed in timestamps will be dropped and not stored in the
key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.
K
- key typeV
- value typesupplier
- a KeyValueBytesStoreSupplier
(cannot be null
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null
for put operations,
it is treated as deleteStoreBuilder
that can build a KeyValueStore
public static <K,V> StoreBuilder<WindowStore<K,V>> windowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
StoreBuilder
that can be used to build a WindowStore
.
The provided supplier should not be a supplier for
TimestampedWindowStores
.
K
- key typeV
- value typesupplier
- a WindowBytesStoreSupplier
(cannot be null
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null
for put operations,
it is treated as deleteStoreBuilder
than can build a WindowStore
public static <K,V> StoreBuilder<TimestampedWindowStore<K,V>> timestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
StoreBuilder
that can be used to build a TimestampedWindowStore
.
The provided supplier should not be a supplier for
WindowStores
. For this case, passed in timestamps will be dropped and not stored in the
window-store. On read, no valid timestamp but a dummy timestamp will be returned.
K
- key typeV
- value typesupplier
- a WindowBytesStoreSupplier
(cannot be null
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null
for put operations,
it is treated as deleteStoreBuilder
that can build a TimestampedWindowStore
public static <K,V> StoreBuilder<SessionStore<K,V>> sessionStoreBuilder(SessionBytesStoreSupplier supplier, Serde<K> keySerde, Serde<V> valueSerde)
StoreBuilder
that can be used to build a SessionStore
.K
- key typeV
- value typesupplier
- a SessionBytesStoreSupplier
(cannot be null
)keySerde
- the key serde to usevalueSerde
- the value serde to use; if the serialized bytes is null
for put operations,
it is treated as deleteStoreBuilder
than can build a SessionStore