Class Topology
ProcessorTopology.
A topology is a graph of sources, processors, and sinks.
A SourceNode is a node in the graph that consumes one or more Kafka topics and forwards them to its
successor nodes.
A Processor is a node in the graph that receives input records from upstream nodes, processes the
records, and optionally forwarding new records to one, multiple, or all of its downstream nodes.
Finally, a SinkNode is a node in the graph that receives records from upstream nodes and writes them to
a Kafka topic.
A Topology allows you to construct a graph of these nodes, and then passed into a new
KafkaStreams instance that will then begin consuming, processing, and producing
records.-
Nested Class Summary
Nested Classes -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalstate storeto the topology.<KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) <KIn,VIn, KOut, VOut>
TopologyaddProcessor(String name, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, String... parentNames) <KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a read-onlystate storeto the topology.<KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Add a sink that sends records from upstreamprocessorsorsourcesto the named Kafka topic.<K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) Add a source that consumes the named topics and forwards the records to childprocessorsandsinks.addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, String name, String... topics) addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource(TimestampExtractor timestampExtractor, String name, String... topics) addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0.addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0.addStateStore(StoreBuilder<?> storeBuilder, String... processorNames) Add astate storeto the topology, and optionally connect it to one or moreprocessors.connectProcessorAndStateStores(String processorName, String... stateStoreNames) Connect aprocessorto one or morestate stores.describe()Returns a description of the specifiedTopology.
-
Constructor Details
-
Topology
public Topology() -
Topology
-
-
Method Details
-
addSource
Add a source that consumes the named topics and forwards the records to childprocessorsandsinks.The source will use the default values from
If you want to specify a source specificStreamsConfigforauto.offset.reset strategy,TimestampExtractor, or key/valueDeserializer, use the corresponding overloadedaddSource(...)method.- Parameters:
name- the unique name of the source used to reference this node when addingprocessororsinkchildrentopics- the name of one or more Kafka topics that this source is to consume- Returns:
- itself
- Throws:
TopologyException- if the provided source name is not unique, no topics are specified, or a topic has already been registered by another source,read-only state store, orglobal state storeNullPointerException- ifnameortopicsisnull, ortopicscontains anulltopic- See Also:
-
addSource
SeeaddSource(String, String...).Takes a
Pattern(cannot benull) to match topics to consumes from, instead of a list of topic names. -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)instead. -
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)instead. -
addSource
-
addSource
-
addSource
-
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) -
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
public Topology addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated.Since 4.0. UseaddSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)instead. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) -
addSource
@Deprecated public Topology addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) Deprecated. -
addSource
public Topology addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) -
addSink
Add a sink that sends records from upstreamprocessorsorsourcesto the named Kafka topic. The specified topic should be created before theKafkaStreamsinstance is started.The sink will use the default values from
Furthermore, the producer's configured partitioner is used to write into the topic. If you want to specify a sink specific key or valueStreamsConfigforSerializer, or use a differentpartitioner, use the corresponding overloadedaddSink(...)method.- Parameters:
name- the unique name of the sinktopic- the name of the Kafka topic to which this sink should write its recordsparentNames- the name of one or moreprocessorsorsources, whose output records this sink should consume and write to the specified output topic- Returns:
- itself
- Throws:
TopologyException- if the provided sink name is not unique, or if a parent processor/source name is unknown or specifies a sinkNullPointerException- ifname,topic, orparentNamesisnull, orparentNamescontains anullparent name- See Also:
-
addSink
public <K,V> Topology addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) SeeaddSink(String, String, String...).Takes a
TopicNameExtractor(cannot benull) that computes topic names to send records into, instead of a single topic name. The topic name extractor is called for every result record and may compute a different topic name each time. All topics, that the topic name extractor may compute, should be created before theKafkaStreamsinstance is started. Returningnullas topic name is invalid and will result in a runtime exception. -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) -
addSink
public <K,V> Topology addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) -
addProcessor
public <KIn,VIn, Topology addProcessorKOut, VOut> (String name, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, String... parentNames) Add aprocessorthat receives and processed records from one or more parent processors orsources. Any record output by this processor will be forwarded to its child processors andsinks.By default, the processor is stateless. There is three different
state stores, which can be connected to a processor:state storesfor processing (i.e., read/write access)read-only state storesglobal state stores(read-only)
supplierprovides state stores viaConnectedStoreProvider.stores(), the correspondingStoreBuilderswill beadded to the topology and connectedto this processor automatically.- Parameters:
name- the unique name of the processor used to reference this node when adding other processor orsinkchildrensupplier- the supplier used to obtainProcessorinstancesparentNames- the name of one or more processors orsources, whose output records this processor should receive and process- Returns:
- itself
- Throws:
TopologyException- if the provided processor name is not unique, or if a parent processor/source name is unknown or specifies a sink- See Also:
-
addStateStore
Add astate storeto the topology, and optionally connect it to one or moreprocessors. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions and the structure of the topology. Each connectedProcessorinstance in the topology has access to a single shard of the state store. Additionally, the state store can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)). If you need access to all data in a state store inside aProcessor, you can use a (read-only)global state store.If no
processorNamesis specified, the state store can beconnectedto one or moreprocessorslater.Note, if a state store is never connected to any
processor, the state store is "dangling" and would not be added to the createdProcessorTopology, whenKafkaStreamsis started. For this case, the state store is not available for "Interactive Queries". If you want to add a state store only for "Interactive Queries", you can use aread-only state store.For failure and recovery, a state store
may be backedby an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is provided by thestore builder, and "-changelog" is a fixed suffix.You can verify the created
ProcessorTopologyand added state stores, and retrieve all generated internal topic names, viadescribe().- Parameters:
storeBuilder- theStoreBuilderused to obtainstate storeinstances (one per shard)processorNames- the names of theprocessorsthat should be able to access the provided state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if a processor name is unknown or specifies a source or sink
-
addReadOnlyStateStore
public <KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) -
addReadOnlyStateStore
public <KIn,VIn> Topology addReadOnlyStateStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a read-onlystate storeto the topology. The state store will be populated with data from the named source topic. State stores are sharded and the number of shards is determined at runtime by the number of input topic partitions for the source topic and the connected processors (if any). Read-only state stores can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)).The
auto.offset.resetproperty will be set to"earliest"for the source topic. If you want to specify a source specificTimestampExtractoryou can useaddReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier).Connectinga read-only state store toprocessorsis optional. If not connected to any processor, the state store will still be created and can be queried viaKafkaStreams.store(StoreQueryParameters)orKafkaStreams.query(StateQueryRequest). If the state store is connected to another processor, each correspondingProcessorinstance in the topology has read-only access to a single shard of the state store. If you need write access to a state store, you can use a"regular" state storeinstead. If you need access to all data in a state store inside aProcessor, you can use a (read-only)global state store.The provided
ProcessorSupplierwill be used to createProcessorinstances which will be used to process the records from the source topic. Theseprocessorsare the only ones with write access to the state store, and should contain logic to keep theStateStoreup-to-date.Read-only state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state storesno dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder- theStoreBuilderused to obtainstate storeinstances (one per shard)sourceName- the unique name of the internally addedsourcekeyDeserializer- theDeserializerfor record keys (can benullto use the default key deserializer fromStreamsConfig)valueDeserializer- theDeserializerfor record values (can benullto use the default value deserializer fromStreamsConfig)topic- the source topic to read the data fromprocessorName- the unique name of the internally addedprocessorwhich maintains the state storestateUpdateSupplier- the supplier used to obtainProcessorinstances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource, read-only state store, orglobal state store
-
addGlobalStore
public <KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) Adds a globalstate storeto the topology. The state store will be populated with data from the named source topic. Global state stores are read-only, and contain data from all partitions of the specified source topic. Thus, eachKafkaStreamsinstance has a full copy to the data; the source topic records are effectively broadcast to all instances. In contrast toread-only state storesglobal state stores are "bootstrapped" on startup, and are maintained by a separate thread. Thus, updates to a global store are not "stream-time synchronized" what may lead to non-deterministic results. Like all other stores, global state stores can be accessed from "outside" using "Interactive Queries" (cf.,KafkaStreams.store(StoreQueryParameters)andKafkaStreams.query(StateQueryRequest)).The
auto.offset.resetproperty will be set to"earliest"for the source topic. If you want to specify a source specificTimestampExtractoryou can useaddGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier).All
processorsof the topology automatically have read-only access to the global store; it is not necessary to connect them. If you need write access to a state store, you can use a"regular" state storeinstead.The provided
ProcessorSupplierwill be used to createProcessorinstances which will be used to process the records from the source topic. Theseprocessorsare the only ones with write access to the state store, and should contain logic to keep theStateStoreup-to-date.Global state stores are always enabled for fault-tolerance and recovery. In contrast to
"regular" state storesno dedicated changelog topic will be created in Kafka though, but the source topic is used for recovery. Thus, the source topic should be configured with log compaction.- Parameters:
storeBuilder- theStoreBuilderused to obtain thestate store(one perKafkaStreamsinstance)sourceName- the unique name of the internally added sourcekeyDeserializer- theDeserializerfor record keys (can benullto use the default key deserializer fromStreamsConfig)valueDeserializer- theDeserializerfor record values (can benullto use the default value deserializer fromStreamsConfig)topic- the source topic to read the data fromprocessorName- the unique name of the internally added processor which maintains the state storestateUpdateSupplier- the supplier used to obtainProcessorinstances, which maintain the state store- Returns:
- itself
- Throws:
TopologyException- if thestate storewas already added, or if the source or processor names are not unique, or if the source topic has already been registered by anothersource,read-only state store, or global state store
-
addGlobalStore
public <KIn,VIn> Topology addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) -
connectProcessorAndStateStores
Connect aprocessorto one or morestate stores. The state stores must have been previously added to the topology viaaddStateStore(StoreBuilder, String...), oraddReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier).- Parameters:
processorName- the name of the processorstateStoreNames- the names of state stores that the processor should be able to access- Returns:
- itself
- Throws:
TopologyException- if the processor name or a state store name is unknown, or if the processor name specifies a source or sink
-
describe
Returns a description of the specifiedTopology.- Returns:
- A description of the topology.
-