Class StreamsBuilder
StreamsBuilder provides the high-level Kafka Streams DSL to specify a Kafka Streams topology.
 
 It is a requirement that the processing logic (Topology) be defined in a deterministic way,
 as in, the order in which all operators are added must be predictable and the same across all application
 instances.
 Topologies are only identical if all operators are added in the same order.
 If different KafkaStreams instances of the same application build different topologies the result may be
 incompatible runtime code and unexpected results or errors
- See Also:
- 
Field SummaryFieldsModifier and TypeFieldDescriptionprotected final org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderprotected final org.apache.kafka.streams.processor.internals.InternalTopologyBuilderThe topology's internal builder.protected final TopologyThe actual topology that is constructed by this StreamsBuilder.
- 
Constructor SummaryConstructorsConstructorDescriptionStreamsBuilder(TopologyConfig topologyConfigs) Create aStreamsBuilderinstance.
- 
Method SummaryModifier and TypeMethodDescription<K,V> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<K, V> consumed, ProcessorSupplier<K, V> stateUpdateSupplier) Deprecated.<KIn,VIn> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn, VIn> consumed, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalStateStoreto the topology.addStateStore(StoreBuilder<?> builder) Adds a state store to the underlyingTopology.build()Returns theTopologythat represents the specified processing logic.build(Properties props) Returns theTopologythat represents the specified processing logic and accepts aPropertiesinstance used to indicate whether to optimize topology or not.protected TopologygetNewTopology(TopologyConfig topologyConfigs) <K,V> GlobalKTable<K, V> globalTable(String topic) Create aGlobalKTablefor the specified topic.<K,V> GlobalKTable<K, V> globalTable(String topic, Consumed<K, V> consumed) Create aGlobalKTablefor the specified topic.<K,V> GlobalKTable<K, V> globalTable(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTablefor the specified topic.<K,V> GlobalKTable<K, V> globalTable(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTablefor the specified topic.<K,V> KStream<K, V> Create aKStreamfrom the specified topic.<K,V> KStream<K, V> Create aKStreamfrom the specified topic.<K,V> KStream<K, V> stream(Collection<String> topics) Create aKStreamfrom the specified topics.<K,V> KStream<K, V> stream(Collection<String> topics, Consumed<K, V> consumed) Create aKStreamfrom the specified topics.<K,V> KStream<K, V> Create aKStreamfrom the specified topic pattern.<K,V> KStream<K, V> Create aKStreamfrom the specified topic pattern.<K,V> KTable<K, V> Create aKTablefor the specified topic.<K,V> KTable<K, V> Create aKTablefor the specified topic.<K,V> KTable<K, V> table(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTablefor the specified topic.<K,V> KTable<K, V> table(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTablefor the specified topic.
- 
Field Details- 
topologyThe actual topology that is constructed by this StreamsBuilder.
- 
internalTopologyBuilderprotected final org.apache.kafka.streams.processor.internals.InternalTopologyBuilder internalTopologyBuilderThe topology's internal builder.
- 
internalStreamsBuilderprotected final org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder internalStreamsBuilder
 
- 
- 
Constructor Details- 
StreamsBuilderpublic StreamsBuilder()
- 
StreamsBuilderCreate aStreamsBuilderinstance.- Parameters:
- topologyConfigs- the streams configs that apply at the topology level. Please refer to- TopologyConfigfor more detail
 
 
- 
- 
Method Details- 
getNewTopology
- 
streamCreate aKStreamfrom the specified topic. The default"auto.offset.reset"strategy, defaultTimestampExtractor, and default key and value deserializers as specified in theconfigare used.If multiple topics are specified there is no ordering guarantee for records from different topics. Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.- Parameters:
- topic- the topic name; cannot be- null
- Returns:
- a KStreamfor the specified topic
 
- 
streamCreate aKStreamfrom the specified topic. The"auto.offset.reset"strategy,TimestampExtractor, key and value deserializers are defined by the options inConsumedare used.Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.
- 
streamCreate aKStreamfrom the specified topics. The default"auto.offset.reset"strategy, defaultTimestampExtractor, and default key and value deserializers as specified in theconfigare used.If multiple topics are specified there is no ordering guarantee for records from different topics. Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.- Parameters:
- topics- the topic names; must contain at least one topic name
- Returns:
- a KStreamfor the specified topics
 
- 
streamCreate aKStreamfrom the specified topics. The"auto.offset.reset"strategy,TimestampExtractor, key and value deserializers are defined by the options inConsumedare used.If multiple topics are specified there is no ordering guarantee for records from different topics. Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.
- 
streamCreate aKStreamfrom the specified topic pattern. The default"auto.offset.reset"strategy, defaultTimestampExtractor, and default key and value deserializers as specified in theconfigare used.If multiple topics are matched by the specified pattern, the created KStreamwill read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.- Parameters:
- topicPattern- the pattern to match for topic names
- Returns:
- a KStreamfor topics matching the regex pattern.
 
- 
streamCreate aKStreamfrom the specified topic pattern. The"auto.offset.reset"strategy,TimestampExtractor, key and value deserializers are defined by the options inConsumedare used.If multiple topics are matched by the specified pattern, the created KStreamwill read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.
- 
tablepublic <K,V> KTable<K,V> table(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTablefor the specified topic. The"auto.offset.reset"strategy,TimestampExtractor, key and value deserializers are defined by the options inConsumedare used. Inputrecordswithnullkey will be dropped.Note that the specified input topic must be partitioned by key. If this is not the case the returned KTablewill be corrupted.The resulting KTablewill be materialized in a localKeyValueStoreusing the givenMaterializedinstance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"to"all"in theStreamsConfig.You should only specify serdes in the Consumedinstance as these will also be used to overwrite the serdes inMaterialized, i.e.,
 To query the localstreamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
 For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)KafkaStreams.metadataForAllStreamsClients()to query the value of the key on a parallel running instance of your Kafka Streams application.- Parameters:
- topic- the topic name; cannot be- null
- consumed- the instance of- Consumedused to define optional parameters; cannot be- null
- materialized- the instance of- Materializedused to materialize a state store; cannot be- null
- Returns:
- a KTablefor the specified topic
 
- 
tableCreate aKTablefor the specified topic. The default"auto.offset.reset"strategy and default key and value deserializers as specified in theconfigare used. Inputrecordswithnullkey will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned KTablewill be corrupted.The resulting KTablewill be materialized in a localKeyValueStorewith an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"to"all"in theStreamsConfig.- Parameters:
- topic- the topic name; cannot be- null
- Returns:
- a KTablefor the specified topic
 
- 
tableCreate aKTablefor the specified topic. The"auto.offset.reset"strategy,TimestampExtractor, key and value deserializers are defined by the options inConsumedare used. Inputrecordswithnullkey will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned KTablewill be corrupted.The resulting KTablewill be materialized in a localKeyValueStorewith an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"to"all"in theStreamsConfig.
- 
tablepublic <K,V> KTable<K,V> table(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aKTablefor the specified topic. The default"auto.offset.reset"strategy as specified in theconfigare used. Key and value deserializers as defined by the options inMaterializedare used. Inputrecordswithnullkey will be dropped.Note that the specified input topics must be partitioned by key. If this is not the case the returned KTablewill be corrupted.The resulting KTablewill be materialized in a localKeyValueStoreusing theMaterializedinstance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the"topology.optimization"to"all"in theStreamsConfig.- Parameters:
- topic- the topic name; cannot be- null
- materialized- the instance of- Materializedused to materialize a state store; cannot be- null
- Returns:
- a KTablefor the specified topic
 
- 
globalTableCreate aGlobalKTablefor the specified topic. Inputrecordswithnullkey will be dropped.The resulting GlobalKTablewill be materialized in a localKeyValueStorewith an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStreamandKGroupedTablethat return aKTable).Note that GlobalKTablealways applies"auto.offset.reset"strategy"earliest"regardless of the specified value inStreamsConfigorConsumed. Furthermore,GlobalKTablecannot be aversioned state store.- Parameters:
- topic- the topic name; cannot be- null
- consumed- the instance of- Consumedused to define optional parameters
- Returns:
- a GlobalKTablefor the specified topic
 
- 
globalTableCreate aGlobalKTablefor the specified topic. The default key and value deserializers as specified in theconfigare used. Inputrecordswithnullkey will be dropped.The resulting GlobalKTablewill be materialized in a localKeyValueStorewith an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStreamandKGroupedTablethat return aKTable).Note that GlobalKTablealways applies"auto.offset.reset"strategy"earliest"regardless of the specified value inStreamsConfig. Furthermore,GlobalKTablecannot be aversioned state store.- Parameters:
- topic- the topic name; cannot be- null
- Returns:
- a GlobalKTablefor the specified topic
 
- 
globalTablepublic <K,V> GlobalKTable<K,V> globalTable(String topic, Consumed<K, V> consumed, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTablefor the specified topic. InputKeyValuepairs withnullkey will be dropped.The resulting GlobalKTablewill be materialized in a localKeyValueStoreconfigured with the provided instance ofMaterialized. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStreamandKGroupedTablethat return aKTable).You should only specify serdes in the Consumedinstance as these will also be used to overwrite the serdes inMaterialized, i.e.,
 To query the localstreamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
 Note thatKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key);GlobalKTablealways applies"auto.offset.reset"strategy"earliest"regardless of the specified value inStreamsConfigorConsumed. Furthermore,GlobalKTablecannot be aversioned state store.- Parameters:
- topic- the topic name; cannot be- null
- consumed- the instance of- Consumedused to define optional parameters; can't be- null
- materialized- the instance of- Materializedused to materialize a state store; cannot be- null
- Returns:
- a GlobalKTablefor the specified topic
 
- 
globalTablepublic <K,V> GlobalKTable<K,V> globalTable(String topic, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create aGlobalKTablefor the specified topic. InputKeyValuepairs withnullkey will be dropped.The resulting GlobalKTablewill be materialized in a localKeyValueStoreconfigured with the provided instance ofMaterialized. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods ofKGroupedStreamandKGroupedTablethat return aKTable).To query the local ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
 Note thatKafkaStreams streams = ... StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-key"; ValueAndTimestamp<V> valueForKey = localStore.get(key);GlobalKTablealways applies"auto.offset.reset"strategy"earliest"regardless of the specified value inStreamsConfig. Furthermore,GlobalKTablecannot be aversioned state store.- Parameters:
- topic- the topic name; cannot be- null
- materialized- the instance of- Materializedused to materialize a state store; cannot be- null
- Returns:
- a GlobalKTablefor the specified topic
 
- 
addStateStoreAdds a state store to the underlyingTopology.It is required to connect state stores to Processors,Transformers, orValueTransformersbefore they can be used.- Parameters:
- builder- the builder used to obtain this state store- StateStoreinstance
- Returns:
- itself
- Throws:
- TopologyException- if state store supplier is already added
 
- 
addGlobalStore@Deprecated public <K,V> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<K, V> consumed, ProcessorSupplier<K, V> stateUpdateSupplier) Deprecated.Since 2.7.0; useaddGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)instead.Adds a globalStateStoreto the topology. TheStateStoresources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStoreper Kafka Streams instance.A SourceNodewith the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided ProcessorSupplierwill be used to create anProcessorNodethat will receive all records forwarded from theSourceNode. NOTE: you should not use theProcessorto insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. ThisProcessorNodeshould be used to keep theStateStoreup-to-date. The defaultTimestampExtractoras specified in theconfigis used.It is not required to connect a global store to Processors,Transformers, orValueTransformer; those have read-only access to all global stores by default.The supplier should always generate a new instance each time ProcessorSupplier.get()gets called. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()would be a violation of the supplier pattern and leads to runtime exceptions.- Parameters:
- storeBuilder- user defined- StoreBuilder; can't be- null
- topic- the topic to source the data from
- consumed- the instance of- Consumedused to define optional parameters; can't be- null
- stateUpdateSupplier- the instance of- ProcessorSupplier
- Returns:
- itself
- Throws:
- TopologyException- if the processor of state is already registered
 
- 
addGlobalStorepublic <KIn,VIn> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn, VIn> consumed, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalStateStoreto the topology. TheStateStoresources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStoreper Kafka Streams instance.A SourceNodewith the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided ProcessorSupplierwill be used to create anProcessorthat will receive all records forwarded from theSourceNode. The supplier should always generate a new instance. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()is a violation of the supplier pattern and leads to runtime exceptions. NOTE: you should not use theProcessorto insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. ThisProcessorshould be used to keep theStateStoreup-to-date. The defaultTimestampExtractoras specified in theconfigis used.It is not required to connect a global store to the Processors,Transformers, orValueTransformer; those have read-only access to all global stores by default.- Parameters:
- storeBuilder- user defined- StoreBuilder; can't be- null
- topic- the topic to source the data from
- consumed- the instance of- Consumedused to define optional parameters; can't be- null
- stateUpdateSupplier- the instance of- ProcessorSupplier
- Returns:
- itself
- Throws:
- TopologyException- if the processor of state is already registered
 
- 
buildReturns theTopologythat represents the specified processing logic. Note that using this method means no optimizations are performed.- Returns:
- the Topologythat represents the specified processing logic
 
- 
buildReturns theTopologythat represents the specified processing logic and accepts aPropertiesinstance used to indicate whether to optimize topology or not.- Parameters:
- props- the- Propertiesused for building possibly optimized topology
- Returns:
- the Topologythat represents the specified processing logic
 
 
- 
addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)instead.