Class StreamsBuilder
StreamsBuilder
provides the high-level Kafka Streams DSL to specify a Kafka Streams topology.
It is a requirement that the processing logic (Topology
) be defined in a deterministic way,
as in, the order in which all operators are added must be predictable and the same across all application
instances.
Topologies are only identical if all operators are added in the same order.
If different KafkaStreams
instances of the same application build different topologies the result may be
incompatible runtime code and unexpected results or errors
- See Also:
-
Constructor Summary
ConstructorDescriptionStreamsBuilder
(TopologyConfig topologyConfigs) Create aStreamsBuilder
instance. -
Method Summary
Modifier and TypeMethodDescription<K,
V> StreamsBuilder addGlobalStore
(StoreBuilder<?> storeBuilder, String topic, Consumed<K, V> consumed, ProcessorSupplier<K, V> stateUpdateSupplier) Deprecated.<KIn,
VIn> StreamsBuilder addGlobalStore
(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn, VIn> consumed, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalStateStore
to the topology.addStateStore
(StoreBuilder<?> builder) Adds a state store to the underlyingTopology
.build()
Returns theTopology
that represents the specified processing logic.build
(Properties props) Returns theTopology
that represents the specified processing logic and accepts aProperties
instance used to indicate whether to optimize topology or not.<K,
V> GlobalKTable<K, V> globalTable
(String topic) Create aGlobalKTable
for the specified topic.<K,
V> GlobalKTable<K, V> globalTable
(String topic, Consumed<K, V> consumed) Create aGlobalKTable
for the specified topic.<K,
V> GlobalKTable<K, V> globalTable
(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTable
for the specified topic.<K,
V> GlobalKTable<K, V> globalTable
(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTable
for the specified topic.<K,
V> KStream<K, V> Create aKStream
from the specified topic.<K,
V> KStream<K, V> Create aKStream
from the specified topic.<K,
V> KStream<K, V> stream
(Collection<String> topics) Create aKStream
from the specified topics.<K,
V> KStream<K, V> stream
(Collection<String> topics, Consumed<K, V> consumed) Create aKStream
from the specified topics.<K,
V> KStream<K, V> Create aKStream
from the specified topic pattern.<K,
V> KStream<K, V> Create aKStream
from the specified topic pattern.<K,
V> KTable<K, V> Create aKTable
for the specified topic.<K,
V> KTable<K, V> Create aKTable
for the specified topic.<K,
V> KTable<K, V> table
(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTable
for the specified topic.<K,
V> KTable<K, V> table
(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTable
for the specified topic.
-
Constructor Details
-
StreamsBuilder
public StreamsBuilder() -
StreamsBuilder
Create aStreamsBuilder
instance.- Parameters:
topologyConfigs
- the streams configs that apply at the topology level. Please refer toTopologyConfig
for more detail
-
-
Method Details
-
stream
Create aKStream
from the specified topic. The default"auto.offset.reset"
strategy, defaultTimestampExtractor
, and default key and value deserializers as specified in theconfig
are used.If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
.- Parameters:
topic
- the topic name; cannot benull
- Returns:
- a
KStream
for the specified topic
-
stream
Create aKStream
from the specified topic. The"auto.offset.reset"
strategy,TimestampExtractor
, key and value deserializers are defined by the options inConsumed
are used.Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
. -
stream
Create aKStream
from the specified topics. The default"auto.offset.reset"
strategy, defaultTimestampExtractor
, and default key and value deserializers as specified in theconfig
are used.If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
.- Parameters:
topics
- the topic names; must contain at least one topic name- Returns:
- a
KStream
for the specified topics
-
stream
Create aKStream
from the specified topics. The"auto.offset.reset"
strategy,TimestampExtractor
, key and value deserializers are defined by the options inConsumed
are used.If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
. -
stream
Create aKStream
from the specified topic pattern. The default"auto.offset.reset"
strategy, defaultTimestampExtractor
, and default key and value deserializers as specified in theconfig
are used.If multiple topics are matched by the specified pattern, the created
KStream
will read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
.- Parameters:
topicPattern
- the pattern to match for topic names- Returns:
- a
KStream
for topics matching the regex pattern.
-
stream
Create aKStream
from the specified topic pattern. The"auto.offset.reset"
strategy,TimestampExtractor
, key and value deserializers are defined by the options inConsumed
are used.If multiple topics are matched by the specified pattern, the created
KStream
will read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned
KStream
. -
table
public <K,V> KTable<K,V> table(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTable
for the specified topic. The"auto.offset.reset"
strategy,TimestampExtractor
, key and value deserializers are defined by the options inConsumed
are used. Inputrecords
withnull
key will be dropped.Note that the specified input topic must be partitioned by key. If this is not the case the returned
KTable
will be corrupted.The resulting
KTable
will be materialized in a localKeyValueStore
using the givenMaterialized
instance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"
to"all"
in theStreamsConfig
.You should only specify serdes in the
Consumed
instance as these will also be used to overwrite the serdes inMaterialized
, i.e.,streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application.- Parameters:
topic
- the topic name; cannot benull
consumed
- the instance ofConsumed
used to define optional parameters; cannot benull
materialized
- the instance ofMaterialized
used to materialize a state store; cannot benull
- Returns:
- a
KTable
for the specified topic
-
table
Create aKTable
for the specified topic. The default"auto.offset.reset"
strategy and default key and value deserializers as specified in theconfig
are used. Inputrecords
withnull
key will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned
KTable
will be corrupted.The resulting
KTable
will be materialized in a localKeyValueStore
with an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"
to"all"
in theStreamsConfig
.- Parameters:
topic
- the topic name; cannot benull
- Returns:
- a
KTable
for the specified topic
-
table
Create aKTable
for the specified topic. The"auto.offset.reset"
strategy,TimestampExtractor
, key and value deserializers are defined by the options inConsumed
are used. Inputrecords
withnull
key will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned
KTable
will be corrupted.The resulting
KTable
will be materialized in a localKeyValueStore
with an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"
to"all"
in theStreamsConfig
. -
table
public <K,V> KTable<K,V> table(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTable
for the specified topic. The default"auto.offset.reset"
strategy as specified in theconfig
are used. Key and value deserializers as defined by the options inMaterialized
are used. Inputrecords
withnull
key will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned
KTable
will be corrupted.The resulting
KTable
will be materialized in a localKeyValueStore
using theMaterialized
instance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"
to"all"
in theStreamsConfig
.- Parameters:
topic
- the topic name; cannot benull
materialized
- the instance ofMaterialized
used to materialize a state store; cannot benull
- Returns:
- a
KTable
for the specified topic
-
globalTable
Create aGlobalKTable
for the specified topic. Inputrecords
withnull
key will be dropped.The resulting
GlobalKTable
will be materialized in a localKeyValueStore
with an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStream
andKGroupedTable
that return aKTable
).Note that
GlobalKTable
always applies"auto.offset.reset"
strategy"earliest"
regardless of the specified value inStreamsConfig
orConsumed
. Furthermore,GlobalKTable
cannot be aversioned state store
.- Parameters:
topic
- the topic name; cannot benull
consumed
- the instance ofConsumed
used to define optional parameters- Returns:
- a
GlobalKTable
for the specified topic
-
globalTable
Create aGlobalKTable
for the specified topic. The default key and value deserializers as specified in theconfig
are used. Inputrecords
withnull
key will be dropped.The resulting
GlobalKTable
will be materialized in a localKeyValueStore
with an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStream
andKGroupedTable
that return aKTable
).Note that
GlobalKTable
always applies"auto.offset.reset"
strategy"earliest"
regardless of the specified value inStreamsConfig
. Furthermore,GlobalKTable
cannot be aversioned state store
.- Parameters:
topic
- the topic name; cannot benull
- Returns:
- a
GlobalKTable
for the specified topic
-
globalTable
public <K,V> GlobalKTable<K,V> globalTable(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTable
for the specified topic.Input
KeyValue
pairs withnull
key will be dropped.The resulting
GlobalKTable
will be materialized in a localKeyValueStore
configured with the provided instance ofMaterialized
. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStream
andKGroupedTable
that return aKTable
).You should only specify serdes in the
Consumed
instance as these will also be used to overwrite the serdes inMaterialized
, i.e.,streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
Note thatKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key);
GlobalKTable
always applies"auto.offset.reset"
strategy"earliest"
regardless of the specified value inStreamsConfig
orConsumed
. Furthermore,GlobalKTable
cannot be aversioned state store
.- Parameters:
topic
- the topic name; cannot benull
consumed
- the instance ofConsumed
used to define optional parameters; can't benull
materialized
- the instance ofMaterialized
used to materialize a state store; cannot benull
- Returns:
- a
GlobalKTable
for the specified topic
-
globalTable
public <K,V> GlobalKTable<K,V> globalTable(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTable
for the specified topic.Input
KeyValue
pairs withnull
key will be dropped.The resulting
GlobalKTable
will be materialized in a localKeyValueStore
configured with the provided instance ofMaterialized
. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStream
andKGroupedTable
that return aKTable
).To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
Note thatKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key);
GlobalKTable
always applies"auto.offset.reset"
strategy"earliest"
regardless of the specified value inStreamsConfig
. Furthermore,GlobalKTable
cannot be aversioned state store
.- Parameters:
topic
- the topic name; cannot benull
materialized
- the instance ofMaterialized
used to materialize a state store; cannot benull
- Returns:
- a
GlobalKTable
for the specified topic
-
addStateStore
Adds a state store to the underlyingTopology
.It is required to connect state stores to
Processors
,Transformers
, orValueTransformers
before they can be used.- Parameters:
builder
- the builder used to obtain this state storeStateStore
instance- Returns:
- itself
- Throws:
TopologyException
- if state store supplier is already added
-
addGlobalStore
@Deprecated public <K,V> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<K, V> consumed, ProcessorSupplier<K, V> stateUpdateSupplier) Deprecated.Since 2.7.0; useaddGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)
instead.Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessorNode
that will receive all records forwarded from theSourceNode
. ThisProcessorNode
should be used to keep theStateStore
up-to-date. The defaultTimestampExtractor
as specified in theconfig
is used.It is not required to connect a global store to
Processors
,Transformers
, orValueTransformer
; those have read-only access to all global stores by default.The supplier should always generate a new instance each time
ProcessorSupplier.get()
gets called. Creating a singleProcessor
object and returning the same object reference inProcessorSupplier.get()
would be a violation of the supplier pattern and leads to runtime exceptions.- Parameters:
storeBuilder
- user definedStoreBuilder
; can't benull
topic
- the topic to source the data fromconsumed
- the instance ofConsumed
used to define optional parameters; can't benull
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
addGlobalStore
public <KIn,VIn> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn, VIn> consumed, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessor
that will receive all records forwarded from theSourceNode
. The supplier should always generate a new instance. Creating a singleProcessor
object and returning the same object reference inProcessorSupplier.get()
is a violation of the supplier pattern and leads to runtime exceptions. ThisProcessor
should be used to keep theStateStore
up-to-date. The defaultTimestampExtractor
as specified in theconfig
is used.It is not required to connect a global store to the
Processors
,Transformers
, orValueTransformer
; those have read-only access to all global stores by default.- Parameters:
storeBuilder
- user definedStoreBuilder
; can't benull
topic
- the topic to source the data fromconsumed
- the instance ofConsumed
used to define optional parameters; can't benull
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
build
Returns theTopology
that represents the specified processing logic. Note that using this method means no optimizations are performed.- Returns:
- the
Topology
that represents the specified processing logic
-
build
Returns theTopology
that represents the specified processing logic and accepts aProperties
instance used to indicate whether to optimize topology or not.- Parameters:
props
- theProperties
used for building possibly optimized topology- Returns:
- the
Topology
that represents the specified processing logic
-
addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)
instead.