public class TopologyBuilder extends Object
ProcessorTopology
. A topology contains an acyclic graph of sources, processors,
and sinks. A source
is a node in the graph that consumes one or more Kafka topics and forwards them to
its child nodes. A processor
is a node in the graph that receives input records from upstream nodes,
processes that records, and optionally forwarding new records to one or all of its children. Finally, a sink
is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
to construct an acyclic graph of these nodes, and the builder is then passed into a new KafkaStreams
instance that will then begin consuming, processing, and producing records
.Modifier and Type | Class and Description |
---|---|
static class |
TopologyBuilder.AutoOffsetReset
|
static class |
TopologyBuilder.TopicsInfo |
Constructor and Description |
---|
TopologyBuilder()
Create a new builder.
|
Modifier and Type | Method and Description |
---|---|
TopologyBuilder |
addGlobalStore(StateStore store,
String sourceName,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Adds a global
StateStore to the topology. |
TopologyBuilder |
addInternalTopic(String topicName)
Adds an internal topic
|
TopologyBuilder |
addProcessor(String name,
ProcessorSupplier supplier,
String... parentNames)
Add a new processor node that receives and processes records output by one or more parent source or processor node.
|
<K,V> TopologyBuilder |
addSink(String name,
String topic,
Serializer<K> keySerializer,
Serializer<V> valSerializer,
StreamPartitioner<? super K,? super V> partitioner,
String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSink(String name,
String topic,
Serializer keySerializer,
Serializer valSerializer,
String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSink(String name,
String topic,
StreamPartitioner partitioner,
String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
the supplied partitioner.
|
TopologyBuilder |
addSink(String name,
String topic,
String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
|
TopologyBuilder |
addSource(String name,
Deserializer keyDeserializer,
Deserializer valDeserializer,
Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(String name,
Deserializer keyDeserializer,
Deserializer valDeserializer,
String... topics)
Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(String name,
Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern
and forward the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(String name,
String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(TopologyBuilder.AutoOffsetReset offsetReset,
String name,
Deserializer keyDeserializer,
Deserializer valDeserializer,
Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern
and forwards the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(TopologyBuilder.AutoOffsetReset offsetReset,
String name,
Deserializer keyDeserializer,
Deserializer valDeserializer,
String... topics)
Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(TopologyBuilder.AutoOffsetReset offsetReset,
String name,
Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern
and forward the records to child processor and/or sink nodes.
|
TopologyBuilder |
addSource(TopologyBuilder.AutoOffsetReset offsetReset,
String name,
String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
|
TopologyBuilder |
addStateStore(StateStoreSupplier supplier,
String... processorNames)
Adds a state store
|
ProcessorTopology |
build(Integer topicGroupId)
Build the topology for the specified topic group.
|
ProcessorTopology |
buildGlobalStateTopology()
Builds the topology for any global state stores
|
TopologyBuilder |
connectProcessorAndStateStores(String processorName,
String... stateStoreNames)
Connects the processor and the state stores
|
TopologyBuilder |
connectProcessors(String... processorNames)
Connects a list of processors.
|
protected TopologyBuilder |
connectSourceStoreAndTopic(String sourceStoreName,
String topic)
This is used only for KStreamBuilder: when adding a KTable from a source topic,
we need to add the topic as the KTable's materialized state store's changelog.
|
Collection<Set<String>> |
copartitionGroups()
Returns the copartition groups.
|
TopologyBuilder |
copartitionSources(Collection<String> sourceNodes)
Asserts that the streams of the specified source nodes must be copartitioned.
|
Pattern |
earliestResetTopicsPattern()
Get the Pattern to match all topics requiring to start reading from earliest available offset
|
Map<String,StateStore> |
globalStateStores()
Get any global
StateStore s that are part of the
topology |
Pattern |
latestResetTopicsPattern()
Get the Pattern to match all topics requiring to start reading from latest available offset
|
Map<Integer,Set<String>> |
nodeGroups()
Returns the map of node groups keyed by the topic group id.
|
TopologyBuilder |
setApplicationId(String applicationId)
Set the applicationId to be used for auto-generated internal topics.
|
Pattern |
sourceTopicPattern() |
Map<String,List<String>> |
stateStoreNameToSourceTopics() |
Map<Integer,TopologyBuilder.TopicsInfo> |
topicGroups()
Returns the map of topic groups keyed by the group id.
|
void |
updateSubscriptions(SubscriptionUpdates subscriptionUpdates,
String threadId) |
public final TopologyBuilder setApplicationId(String applicationId)
topicGroups()
, copartitionSources(java.util.Collection<java.lang.String>)
,
stateStoreNameToSourceTopics
and build(Integer)
.applicationId
- the streams applicationId. Should be the same as set by
StreamsConfig.APPLICATION_ID_CONFIG
public final TopologyBuilder addSource(String name, String... topics)
default key deserializer
and
default value deserializer
specified in the
stream configuration
.name
- the unique name of the source used to reference this node when
adding processor children
.topics
- the name of one or more Kafka topics that this source is to consumepublic final TopologyBuilder addSource(TopologyBuilder.AutoOffsetReset offsetReset, String name, String... topics)
default key deserializer
and
default value deserializer
specified in the
stream configuration
.offsetReset
- the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latestname
- the unique name of the source used to reference this node when
adding processor children
.topics
- the name of one or more Kafka topics that this source is to consumepublic final TopologyBuilder addSource(String name, Pattern topicPattern)
default key deserializer
and
default value deserializer
specified in the
stream configuration
.name
- the unique name of the source used to reference this node when
adding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consumepublic final TopologyBuilder addSource(TopologyBuilder.AutoOffsetReset offsetReset, String name, Pattern topicPattern)
default key deserializer
and
default value deserializer
specified in the
stream configuration
.offsetReset
- the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.name
- the unique name of the source used to reference this node when
adding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consumepublic final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics)
name
- the unique name of the source used to reference this node when
adding processor children
.keyDeserializer
- the key deserializer
used when consuming records; may be null if the source
should use the default key deserializer
specified in the
stream configuration
valDeserializer
- the value deserializer
used when consuming records; may be null if the source
should use the default value deserializer
specified in the
stream configuration
topics
- the name of one or more Kafka topics that this source is to consumeTopologyBuilderException
- if processor is already added or if topics have already been registered by another sourcepublic final TopologyBuilder addSource(TopologyBuilder.AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics)
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest.name
- the unique name of the source used to reference this node when
adding processor children
.keyDeserializer
- the key deserializer
used when consuming records; may be null if the source
should use the default key deserializer
specified in the
stream configuration
valDeserializer
- the value deserializer
used when consuming records; may be null if the source
should use the default value deserializer
specified in the
stream configuration
topics
- the name of one or more Kafka topics that this source is to consumeTopologyBuilderException
- if processor is already added or if topics have already been registered by another sourcepublic TopologyBuilder addGlobalStore(StateStore store, String sourceName, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier)
StateStore
to the topology. The StateStore
sources its data
from all partitions of the provided input topic. There will be exactly one instance of this
StateStore
per Kafka Streams instance.
A SourceNode
with the provided sourceName will be added to consume the data arriving
from the partitions of the input topic.
The provided ProcessorSupplier
will be used to create an ProcessorNode
that will
receive all records forwarded from the SourceNode
. This
ProcessorNode
should be used to keep the StateStore
up-to-date.
store
- the instance of StateStore
sourceName
- name of the SourceNode
that will be automatically addedkeyDeserializer
- the Deserializer
to deserialize keys withvalueDeserializer
- the Deserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of the ProcessorSupplier
stateUpdateSupplier
- the instance of ProcessorSupplier
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern)
name
- the unique name of the source used to reference this node when
adding processor children
.keyDeserializer
- the key deserializer
used when consuming records; may be null if the source
should use the default key deserializer
specified in the
stream configuration
valDeserializer
- the value deserializer
used when consuming records; may be null if the source
should use the default value deserializer
specified in the
stream configuration
topicPattern
- regular expression pattern to match Kafka topics that this source is to consumeTopologyBuilderException
- if processor is already added or if topics have already been registered by namepublic final TopologyBuilder addSource(TopologyBuilder.AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern)
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latestname
- the unique name of the source used to reference this node when
adding processor children
.keyDeserializer
- the key deserializer
used when consuming records; may be null if the source
should use the default key deserializer
specified in the
stream configuration
valDeserializer
- the value deserializer
used when consuming records; may be null if the source
should use the default value deserializer
specified in the
stream configuration
topicPattern
- regular expression pattern to match Kafka topics that this source is to consumeTopologyBuilderException
- if processor is already added or if topics have already been registered by namepublic final TopologyBuilder addSink(String name, String topic, String... parentNames)
default key serializer
and
default value serializer
specified in the
stream configuration
.name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordsparentNames
- the name of one or more source or processor nodes whose output records this sink should consume
and write to its topicaddSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames)
default key serializer
and
default value serializer
specified in the
stream configuration
.
The sink will also use the specified StreamPartitioner
to determine how records are distributed among
the named Kafka topic's partitions. Such control is often useful with topologies that use
state stores
in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
records among partitions using Kafka's default partitioning logic.
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordspartitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume
and write to its topicaddSink(String, String, String...)
,
addSink(String, String, Serializer, Serializer, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames)
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordskeySerializer
- the key serializer
used when consuming records; may be null if the sink
should use the default key serializer
specified in the
stream configuration
valSerializer
- the value serializer
used when consuming records; may be null if the sink
should use the default value serializer
specified in the
stream configuration
parentNames
- the name of one or more source or processor nodes whose output records this sink should consume
and write to its topicaddSink(String, String, String...)
,
addSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
public final <K,V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordskeySerializer
- the key serializer
used when consuming records; may be null if the sink
should use the default key serializer
specified in the
stream configuration
valSerializer
- the value serializer
used when consuming records; may be null if the sink
should use the default value serializer
specified in the
stream configuration
partitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume
and write to its topicTopologyBuilderException
- if parent processor is not added yet, or if this processor's name is equal to the parent's nameaddSink(String, String, String...)
,
addSink(String, String, StreamPartitioner, String...)
,
addSink(String, String, Serializer, Serializer, String...)
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames)
name
- the unique name of the processor nodesupplier
- the supplier used to obtain this node's Processor
instanceparentNames
- the name of one or more source or processor nodes whose output records this processor should receive
and processTopologyBuilderException
- if parent processor is not added yet, or if this processor's name is equal to the parent's namepublic final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames)
supplier
- the supplier used to obtain this state store StateStore
instanceTopologyBuilderException
- if state store supplier is already addedpublic final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames)
processorName
- the name of the processorstateStoreNames
- the names of state stores that the processor usesprotected final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic)
public final TopologyBuilder connectProcessors(String... processorNames)
processorNames
- the name of the processorsTopologyBuilderException
- if less than two processors are specified, or if one of the processors is not added yetpublic final TopologyBuilder addInternalTopic(String topicName)
topicName
- the name of the topicpublic final TopologyBuilder copartitionSources(Collection<String> sourceNodes)
sourceNodes
- a set of source node namespublic Map<Integer,Set<String>> nodeGroups()
public ProcessorTopology build(Integer topicGroupId)
KafkaStreams.KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
constructor.public ProcessorTopology buildGlobalStateTopology()
public Map<String,StateStore> globalStateStores()
StateStore
s that are part of the
topologyStateStore
spublic Map<Integer,TopologyBuilder.TopicsInfo> topicGroups()
public Pattern earliestResetTopicsPattern()
public Pattern latestResetTopicsPattern()
public Map<String,List<String>> stateStoreNameToSourceTopics()
public Collection<Set<String>> copartitionGroups()
public Pattern sourceTopicPattern()
public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId)