Class Topology
ProcessorTopology
.
A topology is a graph of sources, processors, and sinks.
A SourceNode
is a node in the graph that consumes one or more Kafka topics and forwards them to its
successor nodes.
A Processor
is a node in the graph that receives input records from upstream nodes, processes the
records, and optionally forwarding new records to one, multiple, or all of its downstream nodes.
Finally, a SinkNode
is a node in the graph that receives records from upstream nodes and writes them to
a Kafka topic.
A Topology
allows you to construct a graph of these nodes, and then passed into a new
KafkaStreams
instance that will then begin consuming, processing, and producing
records
.-
Nested Class Summary
Nested Classes -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<KIn,
VIn> Topology addGlobalStore
(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalstate store
to the topology.<KIn,
VIn> Topology addGlobalStore
(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) <KIn,
VIn, KOut, VOut>
TopologyaddProcessor
(String name, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, String... parentNames) <KIn,
VIn> Topology addReadOnlyStateStore
(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a read-onlystate store
to the topology.<KIn,
VIn> Topology addReadOnlyStateStore
(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Add a sink that sends records from upstreamprocessors
orsources
to the named Kafka topic.<K,
V> Topology addSink
(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,
V> Topology addSink
(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,
V> Topology addSink
(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,
V> Topology addSink
(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) <K,
V> Topology addSink
(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,
V> Topology addSink
(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,
V> Topology addSink
(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) Add a source that consumes the named topics and forwards the records to childprocessors
andsinks
.addSource
(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource
(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource
(AutoOffsetReset offsetReset, String name, String... topics) addSource
(AutoOffsetReset offsetReset, String name, Pattern topicPattern) addSource
(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource
(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource
(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource
(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource
(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) addSource
(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource
(TimestampExtractor timestampExtractor, String name, String... topics) addSource
(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource
(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0.addSource
(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0.addStateStore
(StoreBuilder<?> storeBuilder, String... processorNames) Add astate store
to the topology, and optionally connect it to one or moreprocessors
.connectProcessorAndStateStores
(String processorName, String... stateStoreNames) Connect aprocessor
to one or morestate stores
.describe()
Returns a description of the specifiedTopology
.
-
Constructor Details
-
Topology
public Topology() -
Topology
-
-
Method Details
-
addSource
Add a source that consumes the named topics and forwards the records to childprocessors
andsinks
.The source will use the default values from
If you want to specify a source specificStreamsConfig
forauto.offset.reset strategy
,TimestampExtractor
, or key/valueDeserializer
, use the corresponding overloadedaddSource(...)
method.- Parameters:
name
- the unique name of the source used to reference this node when addingprocessor
orsink
childrentopics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if the provided source name is not unique, no topics are specified, or a topic has already been registered by another source,read-only state store
, orglobal state store
NullPointerException
- ifname
ortopics
isnull
, ortopics
contains anull
topic- See Also:
-
addSource
SeeaddSource(String, String...)
.Takes a
Pattern
(cannot benull
) to match topics to consumes from, instead of a list of topic names. -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)
instead. -
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)
instead. -
addSource
-
addSource
-
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)
instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)
instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) -
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)
instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSink
Add a sink that sends records from upstreamprocessors
orsources
to the named Kafka topic. The specified topic should be created before theKafkaStreams
instance is started.The sink will use the default values from
Furthermore, the producer's configured partitioner is used to write into the topic. If you want to specify a sink specific key or valueStreamsConfig
forSerializer
, or use a differentpartitioner
, use the corresponding overloadedaddSink(...)
method.- Parameters:
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordsparentNames
- the name of one or moreprocessors
orsources
, whose output records this sink should consume and write to the specified output topic- Returns:
- itself
- Throws:
TopologyException
- if the provided sink name is not unique, or if a parent processor/source name is unknown or specifies a sinkNullPointerException
- ifname
,topic
, orparentNames
isnull
, orparentNames
contains anull
parent name- See Also:
-
addSink
public <K,V> Topology addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) SeeaddSink(String, String, String...)
.Takes a
TopicNameExtractor
(cannot benull
) that computes topic names to send records into, instead of a single topic name. The topic name extractor is called for every result record and may compute a different topic name each time. All topics, that the topic name extractor may compute, should be created before theKafkaStreams
instance is started. Returningnull
as topic name is invalid and will result in a runtime exception. -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addProcessor
public <KIn,VIn, Topology addProcessorKOut, VOut> (String name, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, String... parentNames) Add aprocessor
that receives and processed records from one or more parent processors orsources
. Any record output by this processor will be forwarded to its child processors andsinks
.By default, the processor is stateless. There is three different
state stores
, which can be connected to a processor:state stores
for processing (i.e., read/write access)read-only state stores
global state stores
(read-only)
supplier
provides state stores viaConnectedStoreProvider.stores()
, the correspondingStoreBuilders
will beadded to the topology and connected
to this processor automatically.- Parameters:
name
- the unique name of the processor used to reference this node when adding other processor orsink
childrensupplier
- the supplier used to obtainProcessor
instancesparentNames
- the name of one or more processors orsources
, whose output records this processor should receive and process- Returns:
- itself
- Throws:
TopologyException
- if the provided processor name is not unique, or if a parent processor/source name is unknown or specifies a sink- See Also:
-
addStateStore
Add astate store
to the topology, and optionally connect it to one or moreprocessors
. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions and the structure of the topology. Each connectedProcessor
instance in the topology has access to a single shard of the state store. Additionally, the state store can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)
andKafkaStreams.query(StateQueryRequest)
). If you need access to all data in a state store inside aProcessor
, you can use a (read-only)global state store
.If no
processorNames
is specified, the state store can beconnected
to one or moreprocessors
later.Note, if a state store is never connected to any
processor
, the state store is "dangling" and would not be added to the createdProcessorTopology
, whenKafkaStreams
is started. For this case, the state store is not available for "Interactive Queries". If you want to add a state store only for "Interactive Queries", you can use aread-only state store
.For failure and recovery, a state store
may be backed
by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "storeName" is provided by thestore builder
, and "-changelog" is a fixed suffix.You can verify the created
ProcessorTopology
and added state stores, and retrieve all generated internal topic names, viadescribe()
.- Parameters:
storeBuilder
- theStoreBuilder
used to obtainstate store
instances (one per shard)processorNames
- the names of theprocessors
that should be able to access the provided state store- Returns:
- itself
- Throws:
TopologyException
- if thestate store
was already added, or if a processor name is unknown or specifies a source or sink
-
addReadOnlyStateStore
public <KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) -
addReadOnlyStateStore
public <KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a read-onlystate store
to the topology. The state store will be populated with data from the named source topic. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions for the source topic and the connected processors (if any). Read-only state stores can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)
andKafkaStreams.query(StateQueryRequest)
).The
auto.offset.reset
property will be set to"earliest"
for the source topic. If you want to specify a source specificTimestampExtractor
you can useaddReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)
.Connecting
a read-only state store toprocessors
is optional. If not connected to any processor, the state store will still be created and can be queried viaKafkaStreams.store(StoreQueryParameters)
orKafkaStreams.query(StateQueryRequest)
. If the state store is connected to another processor, each correspondingProcessor
instance in the topology has read-only access to a single shard of the state store. If you need write access to a state store, you can use a"regular" state store
instead. If you need access to all data in a state store inside aProcessor
, you can use a (read-only)global state store
.The provided
ProcessorSupplier
will be used to createProcessor
instances which will be used to process the records from the source topic. Theseprocessors
are the only ones with write access to the state store, and should contain logic to keep theStateStore
up-to-date.Read-only state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state stores
no dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder
- theStoreBuilder
used to obtainstate store
instances (one per shard)sourceName
- the unique name of the internally addedsource
keyDeserializer
- theDeserializer
for record keys (can benull
to use the default key deserializer fromStreamsConfig
)valueDeserializer
- theDeserializer
for record values (can benull
to use the default value deserializer fromStreamsConfig
)topic
- the source topic to read the data fromprocessorName
- the unique name of the internally addedprocessor
which maintains the state storestateUpdateSupplier
- the supplier used to obtainProcessor
instances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException
- if thestate store
was already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource
, read-only state store, orglobal state store
-
addGlobalStore
public <KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalstate store
to the topology. The state store will be populated with data from the named source topic. Global state stores are read-only, and contain data from all partitions of the specified source topic. Thus, eachKafkaStreams
instance has a full copy to the data; the source topic records are effectively broadcast to all instances. In contrast toread-only state stores
global state stores are "bootstrapped" on startup, and are maintained by a separate thread. Thus, updates to a global store are not "stream-time synchronized" what may lead to non-deterministic results. Like all other stores, global state stores can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)
andKafkaStreams.query(StateQueryRequest)
).The
auto.offset.reset
property will be set to"earliest"
for the source topic. If you want to specify a source specificTimestampExtractor
you can useaddGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)
.All
processors
of the topology automatically have read-only access to the global store; it is not necessary to connect them. If you need write access to a state store, you can use a"regular" state store
instead.The provided
ProcessorSupplier
will be used to createProcessor
instances which will be used to process the records from the source topic. Theseprocessors
are the only ones with write access to the state store, and should contain logic to keep theStateStore
up-to-date.Global state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state stores
no dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder
- theStoreBuilder
used to obtain thestate store
(one perKafkaStreams
instance)sourceName
- the unique name of the internally added sourcekeyDeserializer
- theDeserializer
for record keys (can benull
to use the default key deserializer fromStreamsConfig
)valueDeserializer
- theDeserializer
for record values (can benull
to use the default value deserializer fromStreamsConfig
)topic
- the source topic to read the data fromprocessorName
- the unique name of the internally added processor which maintains the state storestateUpdateSupplier
- the supplier used to obtainProcessor
instances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException
- if thestate store
was already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource
,read-only state store
, or global state store
-
addGlobalStore
public <KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) -
connectProcessorAndStateStores
Connect aprocessor
to one or morestate stores
. The state stores must have been previously added to the topology viaaddStateStore(StoreBuilder, String...)
, oraddReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)
.- Parameters:
processorName
- the name of the processorstateStoreNames
- the names of state stores that the processor should be able to access- Returns:
- itself
- Throws:
TopologyException
- if the processor name or a state store name is unknown, or if the processor name specifies a source or sink
-
describe
Returns a description of the specifiedTopology
.- Returns:
- A description of the topology.
-