public class StreamsBuilder
extends java.lang.Object
StreamsBuilder provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.Topology,
KStream,
KTable,
GlobalKTable| Constructor | Description |
|---|---|
StreamsBuilder() |
| Modifier and Type | Method | Description |
|---|---|---|
StreamsBuilder |
addGlobalStore(StoreBuilder storeBuilder,
java.lang.String topic,
java.lang.String sourceName,
Consumed consumed,
java.lang.String processorName,
ProcessorSupplier stateUpdateSupplier) |
Deprecated.
|
StreamsBuilder |
addGlobalStore(StoreBuilder storeBuilder,
java.lang.String topic,
Consumed consumed,
ProcessorSupplier stateUpdateSupplier) |
Adds a global
StateStore to the topology. |
StreamsBuilder |
addStateStore(StoreBuilder builder) |
Adds a state store to the underlying
Topology. |
Topology |
build() |
Returns the
Topology that represents the specified processing logic. |
<K,V> GlobalKTable<K,V> |
globalTable(java.lang.String topic) |
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(java.lang.String topic,
Consumed<K,V> consumed) |
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(java.lang.String topic,
Consumed<K,V> consumed,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) |
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(java.lang.String topic,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) |
Create a
GlobalKTable for the specified topic. |
<K,V> KStream<K,V> |
stream(java.lang.String topic) |
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(java.lang.String topic,
Consumed<K,V> consumed) |
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(java.util.Collection<java.lang.String> topics) |
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(java.util.Collection<java.lang.String> topics,
Consumed<K,V> consumed) |
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(java.util.regex.Pattern topicPattern) |
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(java.util.regex.Pattern topicPattern,
Consumed<K,V> consumed) |
Create a
KStream from the specified topic pattern. |
<K,V> KTable<K,V> |
table(java.lang.String topic) |
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(java.lang.String topic,
Consumed<K,V> consumed) |
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(java.lang.String topic,
Consumed<K,V> consumed,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) |
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(java.lang.String topic,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) |
Create a
KTable for the specified topic. |
public <K,V> KStream<K,V> stream(java.lang.String topic)
KStream from the specified topics.
The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value
deserializers as specified in the config are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
topic - the topic name; cannot be nullKStream for the specified topicspublic <K,V> KStream<K,V> stream(java.lang.String topic, Consumed<K,V> consumed)
KStream from the specified topics.
The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers
are defined by the options in Consumed are used.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)
KStream from the specified topics.
The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value
deserializers as specified in the config are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
topics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics, Consumed<K,V> consumed)
KStream from the specified topics.
The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers
are defined by the options in Consumed are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
public <K,V> KStream<K,V> stream(java.util.regex.Pattern topicPattern)
KStream from the specified topic pattern.
The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value
deserializers as specified in the config are used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
topicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(java.util.regex.Pattern topicPattern, Consumed<K,V> consumed)
KStream from the specified topic pattern.
The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers
are defined by the options in Consumed are used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream.
public <K,V> KTable<K,V> table(java.lang.String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable for the specified topic.
The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers
are defined by the options in Consumed are used.
Input records with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore using the given
Materialized instance.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
You should only specify serdes in the Consumed instance as these will also be used to overwrite the
serdes in Materialized, i.e.,
streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.topic - the topic name; cannot be nullconsumed - the instance of Consumed used to define optional parameters; cannot be nullmaterialized - the instance of Materialized used to materialize a state store; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(java.lang.String topic)
KTable for the specified topic.
The default "auto.offset.reset" strategy and default key and value deserializers as specified in the
config are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
topic - the topic name; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(java.lang.String topic, Consumed<K,V> consumed)
KTable for the specified topic.
The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers
are defined by the options in Consumed are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
public <K,V> KTable<K,V> table(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable for the specified topic.
The default "auto.offset.reset" strategy as specified in the config are used.
Key and value deserializers as defined by the options in Materialized are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore using the Materialized instance.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
topic - the topic name; cannot be nullmaterialized - the instance of Materialized used to materialize a state store; cannot be nullKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(java.lang.String topic, Consumed<K,V> consumed)
GlobalKTable for the specified topic.
Input records with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal
store name. Note that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig or Consumed.
topic - the topic name; cannot be nullconsumed - the instance of Consumed used to define optional parametersGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(java.lang.String topic)
GlobalKTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal
store name. Note that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.
topic - the topic name; cannot be nullGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(java.lang.String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
GlobalKTable for the specified topic.
Input KeyValue pairs with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore configured with
the provided instance of Materialized.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
You should only specify serdes in the Consumed instance as these will also be used to overwrite the
serdes in Materialized, i.e.,
streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig or Consumed.topic - the topic name; cannot be nullconsumed - the instance of Consumed used to define optional parameters; can't be nullmaterialized - the instance of Materialized used to materialize a state store; cannot be nullGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(java.lang.String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
GlobalKTable for the specified topic.
Input KeyValue pairs with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore configured with
the provided instance of Materialized.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.topic - the topic name; cannot be nullmaterialized - the instance of Materialized used to materialize a state store; cannot be nullGlobalKTable for the specified topicpublic StreamsBuilder addStateStore(StoreBuilder builder)
Topology.builder - the builder used to obtain this state store StateStore instanceTopologyException - if state store supplier is already added@Deprecated public StreamsBuilder addGlobalStore(StoreBuilder storeBuilder, java.lang.String topic, java.lang.String sourceName, Consumed consumed, java.lang.String processorName, ProcessorSupplier stateUpdateSupplier)
addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) insteadpublic StreamsBuilder addGlobalStore(StoreBuilder storeBuilder, java.lang.String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier)
StateStore to the topology.
The StateStore sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore per Kafka Streams instance.
A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all
records forwarded from the SourceNode.
This ProcessorNode should be used to keep the StateStore up-to-date.
The default TimestampExtractor as specified in the config is used.
storeBuilder - user defined StoreBuilder; can't be nulltopic - the topic to source the data fromconsumed - the instance of Consumed used to define optional parameters; can't be nullstateUpdateSupplier - the instance of ProcessorSupplierTopologyException - if the processor of state is already registered