Class Topology
public class Topology extends Object
ProcessorTopology
.
A topology is an acyclic graph of sources, processors, and sinks.
A source
is a node in the graph that consumes one or more Kafka topics and forwards them to its
successor nodes.
A processor
is a node in the graph that receives input records from upstream nodes, processes the
records, and optionally forwarding new records to one or all of its downstream nodes.
Finally, a sink
is a node in the graph that receives records from upstream nodes and writes them to
a Kafka topic.
A Topology
allows you to construct an acyclic graph of these nodes, and then passed into a new
KafkaStreams
instance that will then begin consuming, processing, and producing
records
.-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Topology.AutoOffsetReset
Sets theauto.offset.reset
configuration whenadding a source processor
or when creatingKStream
orKTable
viaStreamsBuilder
. -
Field Summary
Fields Modifier and Type Field Description protected org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
internalTopologyBuilder
-
Constructor Summary
Constructors Constructor Description Topology()
-
Method Summary
Modifier and Type Method Description <K, V> Topology
addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K,V> stateUpdateSupplier)
Deprecated.Since 2.7.0.<KIn, VIn> Topology
addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)
Adds a globalStateStore
to the topology.<K, V> Topology
addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K,V> stateUpdateSupplier)
Deprecated.Since 2.7.0.<KIn, VIn> Topology
addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)
Adds a globalStateStore
to the topology.<KIn, VIn, KOut, VOut>
TopologyaddProcessor(String name, ProcessorSupplier<KIn,VIn,KOut,VOut> supplier, String... parentNames)
Add a new processor node that receives and processes records output by one or more parent source or processor node.Topology
addProcessor(String name, ProcessorSupplier supplier, String... parentNames)
Deprecated.Since 2.7.0 UseaddProcessor(String, ProcessorSupplier, String...)
instead.Topology
addSink(String name, String topic, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.<K, V> Topology
addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.<K, V> Topology
addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.<K, V> Topology
addSink(String name, String topic, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using the supplied partitioner.<K, V> Topology
addSink(String name, TopicNameExtractor<K,V> topicExtractor, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
.<K, V> Topology
addSink(String name, TopicNameExtractor<K,V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
.<K, V> Topology
addSink(String name, TopicNameExtractor<K,V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
.<K, V> Topology
addSink(String name, TopicNameExtractor<K,V> topicExtractor, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)
Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
, using the supplied partitioner.Topology
addSource(String name, String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.Topology
addSource(String name, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.Topology
addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)
Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.Topology
addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.Topology
addSource(TimestampExtractor timestampExtractor, String name, String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.Topology
addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)
Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)
Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics)
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.Topology
addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern)
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes.Topology
addStateStore(StoreBuilder<?> storeBuilder, String... processorNames)
Adds a state store.Topology
connectProcessorAndStateStores(String processorName, String... stateStoreNames)
Connects the processor and the state stores.TopologyDescription
describe()
Returns a description of the specifiedTopology
.
-
Field Details
-
internalTopologyBuilder
protected final org.apache.kafka.streams.processor.internals.InternalTopologyBuilder internalTopologyBuilder
-
-
Constructor Details
-
Topology
public Topology()
-
-
Method Details
-
addSource
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
name
- the unique name of the source used to reference this node whenadding processor children
.topics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
name
- the unique name of the source used to reference this node whenadding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
offsetReset
- the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latestname
- the unique name of the source used to reference this node whenadding processor children
.topics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
offsetReset
- the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.name
- the unique name of the source used to reference this node whenadding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
.- Parameters:
timestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedname
- the unique name of the source used to reference this node whenadding processor children
.topics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern)Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
.- Parameters:
timestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedname
- the unique name of the source used to reference this node whenadding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics)Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
.- Parameters:
offsetReset
- the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latesttimestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedname
- the unique name of the source used to reference this node whenadding processor children
.topics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern)Add a new source that consumes from topics matching the given pattern and forward the records to child processor and/or sink nodes. The source will use thedefault key deserializer
anddefault value deserializer
specified in thestream configuration
.- Parameters:
offsetReset
- the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.timestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedname
- the unique name of the source used to reference this node whenadding processor children
.topicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
name
- the unique name of the source used to reference this node whenadding processor children
keyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
name
- the unique name of the source used to reference this node whenadding processor children
keyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by name
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying topics that share the same key-value data format.- Parameters:
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latestname
- the unique name of the source used to reference this node whenadding processor children
keyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by name
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format.- Parameters:
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latestname
- the unique name of the source used to reference this node whenadding processor children
keyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by name
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics)Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers.- Parameters:
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest.name
- the unique name of the source used to reference this node whenadding processor children
.timestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedkeyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopics
- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by another source
-
addSource
public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern)Add a new source that consumes from topics matching the given pattern and forwards the records to child processor and/or sink nodes. The source will use the specified key and value deserializers. The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for topics that share the same key-value data format.- Parameters:
offsetReset
- the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latestname
- the unique name of the source used to reference this node whenadding processor children
.timestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedkeyDeserializer
- key deserializer used to read this source, if not specified the default key deserializer defined in the configs will be usedvalueDeserializer
- value deserializer used to read this source, if not specified the default value deserializer defined in the configs will be usedtopicPattern
- regular expression pattern to match Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException
- if processor is already added or if topics have already been registered by name
-
addSink
Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. The sink will use thedefault key serializer
anddefault value serializer
specified in thestream configuration
.- Parameters:
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordsparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and write to its topic- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, String topic, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using the supplied partitioner. The sink will use thedefault key serializer
anddefault value serializer
specified in thestream configuration
.The sink will also use the specified
StreamPartitioner
to determine how records are distributed among the named Kafka topic's partitions. Such control is often useful with topologies that usestate stores
in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute records among partitions using Kafka's default partitioning logic.- Parameters:
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordspartitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and write to its topic- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, Serializer, Serializer, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. The sink will use the specified key and value serializers.- Parameters:
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordskeySerializer
- thekey serializer
used when consuming records; may be null if the sink should use thedefault key serializer
specified in thestream configuration
valueSerializer
- thevalue serializer
used when consuming records; may be null if the sink should use thedefault value serializer
specified in thestream configuration
parentNames
- the name of one or more source or processor nodes whose output records this sink should consume and write to its topic- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. The sink will use the specified key and value serializers, and the supplied partitioner.- Parameters:
name
- the unique name of the sinktopic
- the name of the Kafka topic to which this sink should write its recordskeySerializer
- thekey serializer
used when consuming records; may be null if the sink should use thedefault key serializer
specified in thestream configuration
valueSerializer
- thevalue serializer
used when consuming records; may be null if the sink should use thedefault value serializer
specified in thestream configuration
partitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and write to its topic- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, String...)
-
addSink
public <K, V> Topology addSink(String name, TopicNameExtractor<K,V> topicExtractor, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
. The topics that it may ever send to should be pre-created. The sink will use thedefault key serializer
anddefault value serializer
specified in thestream configuration
.- Parameters:
name
- the unique name of the sinktopicExtractor
- the extractor to determine the name of the Kafka topic to which this sink should write for each recordparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, TopicNameExtractor<K,V> topicExtractor, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
, using the supplied partitioner. The topics that it may ever send to should be pre-created. The sink will use thedefault key serializer
anddefault value serializer
specified in thestream configuration
.The sink will also use the specified
StreamPartitioner
to determine how records are distributed among the named Kafka topic's partitions. Such control is often useful with topologies that usestate stores
in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute records among partitions using Kafka's default partitioning logic.- Parameters:
name
- the unique name of the sinktopicExtractor
- the extractor to determine the name of the Kafka topic to which this sink should write for each recordpartitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, Serializer, Serializer, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, TopicNameExtractor<K,V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
. The topics that it may ever send to should be pre-created. The sink will use the specified key and value serializers.- Parameters:
name
- the unique name of the sinktopicExtractor
- the extractor to determine the name of the Kafka topic to which this sink should write for each recordkeySerializer
- thekey serializer
used when consuming records; may be null if the sink should use thedefault key serializer
specified in thestream configuration
valueSerializer
- thevalue serializer
used when consuming records; may be null if the sink should use thedefault value serializer
specified in thestream configuration
parentNames
- the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
-
addSink
public <K, V> Topology addSink(String name, TopicNameExtractor<K,V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K,? super V> partitioner, String... parentNames)Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based ontopicExtractor
. The topics that it may ever send to should be pre-created. The sink will use the specified key and value serializers, and the supplied partitioner.- Parameters:
name
- the unique name of the sinktopicExtractor
- the extractor to determine the name of the Kafka topic to which this sink should write for each recordkeySerializer
- thekey serializer
used when consuming records; may be null if the sink should use thedefault key serializer
specified in thestream configuration
valueSerializer
- thevalue serializer
used when consuming records; may be null if the sink should use thedefault value serializer
specified in thestream configuration
partitioner
- the function that should be used to determine the partition for each record processed by the sinkparentNames
- the name of one or more source or processor nodes whose output records this sink should consume and dynamically write to topics- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name- See Also:
addSink(String, String, String...)
,addSink(String, String, StreamPartitioner, String...)
,addSink(String, String, Serializer, Serializer, String...)
-
addProcessor
@Deprecated public Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames)Deprecated.Since 2.7.0 UseaddProcessor(String, ProcessorSupplier, String...)
instead.Add a new processor node that receives and processes records output by one or more parent source or processor node. Any new record output by this processor will be forwarded to its child processor or sink nodes. The supplier should always generate a new instance each timeProcessorSupplier.get()
gets called. Creating a singleProcessor
object and returning the same object reference inProcessorSupplier.get()
would be a violation of the supplier pattern and leads to runtime exceptions. Ifsupplier
provides stores viaConnectedStoreProvider.stores()
, the providedStoreBuilder
s will be added to the topology and connected to this processor automatically.- Parameters:
name
- the unique name of the processor nodesupplier
- the supplier used to obtain this node'sProcessor
instanceparentNames
- the name of one or more source or processor nodes whose output records this processor should receive and process- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name
-
addProcessor
public <KIn, VIn, KOut, VOut> Topology addProcessor(String name, ProcessorSupplier<KIn,VIn,KOut,VOut> supplier, String... parentNames)Add a new processor node that receives and processes records output by one or more parent source or processor node. Any new record output by this processor will be forwarded to its child processor or sink nodes. Ifsupplier
provides stores viaConnectedStoreProvider.stores()
, the providedStoreBuilder
s will be added to the topology and connected to this processor automatically.- Parameters:
name
- the unique name of the processor nodesupplier
- the supplier used to obtain this node'sProcessor
instanceparentNames
- the name of one or more source or processor nodes whose output records this processor should receive and process- Returns:
- itself
- Throws:
TopologyException
- if parent processor is not added yet, or if this processor's name is equal to the parent's name
-
addStateStore
Adds a state store.- Parameters:
storeBuilder
- the storeBuilder used to obtain this state storeStateStore
instanceprocessorNames
- the names of the processors that should be able to access the provided store- Returns:
- itself
- Throws:
TopologyException
- if state store supplier is already added
-
addGlobalStore
@Deprecated public <K, V> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K,V> stateUpdateSupplier)Deprecated.Since 2.7.0. UseaddGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)
instead.Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessorNode
that will receive all records forwarded from theSourceNode
. The supplier should always generate a new instance each timeProcessorSupplier.get()
gets called. Creating a singleProcessor
object and returning the same object reference inProcessorSupplier.get()
would be a violation of the supplier pattern and leads to runtime exceptions. ThisProcessorNode
should be used to keep theStateStore
up-to-date. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
storeBuilder
- user defined state store buildersourceName
- name of theSourceNode
that will be automatically addedkeyDeserializer
- theDeserializer
to deserialize keys withvalueDeserializer
- theDeserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of theProcessorSupplier
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
addGlobalStore
@Deprecated public <K, V> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, String processorName, ProcessorSupplier<K,V> stateUpdateSupplier)Deprecated.Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessorNode
that will receive all records forwarded from theSourceNode
. The supplier should always generate a new instance each timeProcessorSupplier.get()
gets called. Creating a singleProcessor
object and returning the same object reference inProcessorSupplier.get()
would be a violation of the supplier pattern and leads to runtime exceptions. ThisProcessorNode
should be used to keep theStateStore
up-to-date.- Parameters:
storeBuilder
- user defined key value store buildersourceName
- name of theSourceNode
that will be automatically addedtimestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedkeyDeserializer
- theDeserializer
to deserialize keys withvalueDeserializer
- theDeserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of theProcessorSupplier
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
addGlobalStore
public <KIn, VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessorNode
that will receive all records forwarded from theSourceNode
. ThisProcessorNode
should be used to keep theStateStore
up-to-date. The defaultTimestampExtractor
as specified in theconfig
is used.- Parameters:
storeBuilder
- user defined state store buildersourceName
- name of theSourceNode
that will be automatically addedkeyDeserializer
- theDeserializer
to deserialize keys withvalueDeserializer
- theDeserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of theProcessorSupplier
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
addGlobalStore
public <KIn, VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)Adds a globalStateStore
to the topology. TheStateStore
sources its data from all partitions of the provided input topic. There will be exactly one instance of thisStateStore
per Kafka Streams instance.A
SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.The provided
ProcessorSupplier
will be used to create anProcessorNode
that will receive all records forwarded from theSourceNode
. ThisProcessorNode
should be used to keep theStateStore
up-to-date.- Parameters:
storeBuilder
- user defined key value store buildersourceName
- name of theSourceNode
that will be automatically addedtimestampExtractor
- the stateless timestamp extractor used for this source, if not specified the default extractor defined in the configs will be usedkeyDeserializer
- theDeserializer
to deserialize keys withvalueDeserializer
- theDeserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of theProcessorSupplier
stateUpdateSupplier
- the instance ofProcessorSupplier
- Returns:
- itself
- Throws:
TopologyException
- if the processor of state is already registered
-
connectProcessorAndStateStores
Connects the processor and the state stores.- Parameters:
processorName
- the name of the processorstateStoreNames
- the names of state stores that the processor uses- Returns:
- itself
- Throws:
TopologyException
- if the processor or a state store is unknown
-
describe
Returns a description of the specifiedTopology
.- Returns:
- a description of the topology.
-