Class StreamsBuilder

java.lang.Object
org.apache.kafka.streams.StreamsBuilder

public class StreamsBuilder extends Object
StreamsBuilder provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.

It is a requirement that the processing logic (Topology) be defined in a deterministic way, as in, the order in which all operators are added must be predictable and the same across all application instances. Topologies are only identical if all operators are added in the same order. If different KafkaStreams instances of the same application build different topologies the result may be incompatible runtime code and unexpected results or errors

See Also:
  • Field Details

    • topology

      protected final Topology topology
      The actual topology that is constructed by this StreamsBuilder.
    • internalTopologyBuilder

      protected final org.apache.kafka.streams.processor.internals.InternalTopologyBuilder internalTopologyBuilder
      The topology's internal builder.
    • internalStreamsBuilder

      protected final org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder internalStreamsBuilder
  • Constructor Details

    • StreamsBuilder

      public StreamsBuilder()
    • StreamsBuilder

      public StreamsBuilder(TopologyConfig topologyConfigs)
      Create a StreamsBuilder instance.
      Parameters:
      topologyConfigs - the streams configs that apply at the topology level. Please refer to TopologyConfig for more detail
  • Method Details

    • getNewTopology

      protected Topology getNewTopology(TopologyConfig topologyConfigs)
    • stream

      public <K, V> KStream<K,V> stream(String topic)
      Create a KStream from the specified topic. The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value deserializers as specified in the config are used.

      If multiple topics are specified there is no ordering guarantee for records from different topics.

      Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topic - the topic name; cannot be null
      Returns:
      a KStream for the specified topic
    • stream

      public <K, V> KStream<K,V> stream(String topic, Consumed<K,V> consumed)
      Create a KStream from the specified topic. The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers are defined by the options in Consumed are used.

      Note that the specified input topic must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topic - the topic names; cannot be null
      consumed - the instance of Consumed used to define optional parameters
      Returns:
      a KStream for the specified topic
    • stream

      public <K, V> KStream<K,V> stream(Collection<String> topics)
      Create a KStream from the specified topics. The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value deserializers as specified in the config are used.

      If multiple topics are specified there is no ordering guarantee for records from different topics.

      Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topics - the topic names; must contain at least one topic name
      Returns:
      a KStream for the specified topics
    • stream

      public <K, V> KStream<K,V> stream(Collection<String> topics, Consumed<K,V> consumed)
      Create a KStream from the specified topics. The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers are defined by the options in Consumed are used.

      If multiple topics are specified there is no ordering guarantee for records from different topics.

      Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topics - the topic names; must contain at least one topic name
      consumed - the instance of Consumed used to define optional parameters
      Returns:
      a KStream for the specified topics
    • stream

      public <K, V> KStream<K,V> stream(Pattern topicPattern)
      Create a KStream from the specified topic pattern. The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value deserializers as specified in the config are used.

      If multiple topics are matched by the specified pattern, the created KStream will read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.

      Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topicPattern - the pattern to match for topic names
      Returns:
      a KStream for topics matching the regex pattern.
    • stream

      public <K, V> KStream<K,V> stream(Pattern topicPattern, Consumed<K,V> consumed)
      Create a KStream from the specified topic pattern. The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers are defined by the options in Consumed are used.

      If multiple topics are matched by the specified pattern, the created KStream will read data from all of them and there is no ordering guarantee between records from different topics. This also means that the work will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition count of any matching topic rather than the total number of partitions across all topics.

      Note that the specified input topics must be partitioned by key. If this is not the case it is the user's responsibility to repartition the data before any key based operation (like aggregation or join) is applied to the returned KStream.

      Parameters:
      topicPattern - the pattern to match for topic names
      consumed - the instance of Consumed used to define optional parameters
      Returns:
      a KStream for topics matching the regex pattern.
    • table

      public <K, V> KTable<K,V> table(String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a KTable for the specified topic. The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers are defined by the options in Consumed are used. Input records with null key will be dropped.

      Note that the specified input topic must be partitioned by key. If this is not the case the returned KTable will be corrupted.

      The resulting KTable will be materialized in a local KeyValueStore using the given Materialized instance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig.

      You should only specify serdes in the Consumed instance as these will also be used to overwrite the serdes in Materialized, i.e.,

       
       streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
       
       
      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):
      
       KafkaStreams streams = ...
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-key";
       ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
       
      For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application.
      Parameters:
      topic - the topic name; cannot be null
      consumed - the instance of Consumed used to define optional parameters; cannot be null
      materialized - the instance of Materialized used to materialize a state store; cannot be null
      Returns:
      a KTable for the specified topic
    • table

      public <K, V> KTable<K,V> table(String topic)
      Create a KTable for the specified topic. The default "auto.offset.reset" strategy and default key and value deserializers as specified in the config are used. Input records with null key will be dropped.

      Note that the specified input topics must be partitioned by key. If this is not the case the returned KTable will be corrupted.

      The resulting KTable will be materialized in a local KeyValueStore with an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig.

      Parameters:
      topic - the topic name; cannot be null
      Returns:
      a KTable for the specified topic
    • table

      public <K, V> KTable<K,V> table(String topic, Consumed<K,V> consumed)
      Create a KTable for the specified topic. The "auto.offset.reset" strategy, TimestampExtractor, key and value deserializers are defined by the options in Consumed are used. Input records with null key will be dropped.

      Note that the specified input topics must be partitioned by key. If this is not the case the returned KTable will be corrupted.

      The resulting KTable will be materialized in a local KeyValueStore with an internal store name. Note that store name may not be queryable through Interactive Queries. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig.

      Parameters:
      topic - the topic name; cannot be null
      consumed - the instance of Consumed used to define optional parameters; cannot be null
      Returns:
      a KTable for the specified topic
    • table

      public <K, V> KTable<K,V> table(String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a KTable for the specified topic. The default "auto.offset.reset" strategy as specified in the config are used. Key and value deserializers as defined by the options in Materialized are used. Input records with null key will be dropped.

      Note that the specified input topics must be partitioned by key. If this is not the case the returned KTable will be corrupted.

      The resulting KTable will be materialized in a local KeyValueStore using the Materialized instance. An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig.

      Parameters:
      topic - the topic name; cannot be null
      materialized - the instance of Materialized used to materialize a state store; cannot be null
      Returns:
      a KTable for the specified topic
    • globalTable

      public <K, V> GlobalKTable<K,V> globalTable(String topic, Consumed<K,V> consumed)
      Create a GlobalKTable for the specified topic. Input records with null key will be dropped.

      The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).

      Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig or Consumed.

      Parameters:
      topic - the topic name; cannot be null
      consumed - the instance of Consumed used to define optional parameters
      Returns:
      a GlobalKTable for the specified topic
    • globalTable

      public <K, V> GlobalKTable<K,V> globalTable(String topic)
      Create a GlobalKTable for the specified topic. The default key and value deserializers as specified in the config are used. Input records with null key will be dropped.

      The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal store name. Note that store name may not be queryable through Interactive Queries. No internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).

      Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig.

      Parameters:
      topic - the topic name; cannot be null
      Returns:
      a GlobalKTable for the specified topic
    • globalTable

      public <K, V> GlobalKTable<K,V> globalTable(String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a GlobalKTable for the specified topic. Input KeyValue pairs with null key will be dropped.

      The resulting GlobalKTable will be materialized in a local KeyValueStore configured with the provided instance of Materialized. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).

      You should only specify serdes in the Consumed instance as these will also be used to overwrite the serdes in Materialized, i.e.,

       
       streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
       
       
      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):
      
       KafkaStreams streams = ...
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-key";
       ValueAndTimestamp<V> valueForKey = localStore.get(key);
       
      Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig or Consumed.
      Parameters:
      topic - the topic name; cannot be null
      consumed - the instance of Consumed used to define optional parameters; can't be null
      materialized - the instance of Materialized used to materialize a state store; cannot be null
      Returns:
      a GlobalKTable for the specified topic
    • globalTable

      public <K, V> GlobalKTable<K,V> globalTable(String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a GlobalKTable for the specified topic. Input KeyValue pairs with null key will be dropped.

      The resulting GlobalKTable will be materialized in a local KeyValueStore configured with the provided instance of Materialized. However, no internal changelog topic is created since the original input topic can be used for recovery (cf. methods of KGroupedStream and KGroupedTable that return a KTable).

      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):

      
       KafkaStreams streams = ...
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-key";
       ValueAndTimestamp<V> valueForKey = localStore.get(key);
       
      Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig.
      Parameters:
      topic - the topic name; cannot be null
      materialized - the instance of Materialized used to materialize a state store; cannot be null
      Returns:
      a GlobalKTable for the specified topic
    • addStateStore

      public StreamsBuilder addStateStore(StoreBuilder<?> builder)
      Adds a state store to the underlying Topology.

      It is required to connect state stores to Processors, Transformers, or ValueTransformers before they can be used.

      Parameters:
      builder - the builder used to obtain this state store StateStore instance
      Returns:
      itself
      Throws:
      TopologyException - if state store supplier is already added
    • addGlobalStore

      @Deprecated public <K, V> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<K,V> consumed, ProcessorSupplier<K,V> stateUpdateSupplier)
      Adds a global StateStore to the topology. The StateStore sources its data from all partitions of the provided input topic. There will be exactly one instance of this StateStore per Kafka Streams instance.

      A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.

      The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all records forwarded from the SourceNode. NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date. The default TimestampExtractor as specified in the config is used.

      It is not required to connect a global store to Processors, Transformers, or ValueTransformer; those have read-only access to all global stores by default.

      The supplier should always generate a new instance each time ProcessorSupplier.get() gets called. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() would be a violation of the supplier pattern and leads to runtime exceptions.

      Parameters:
      storeBuilder - user defined StoreBuilder; can't be null
      topic - the topic to source the data from
      consumed - the instance of Consumed used to define optional parameters; can't be null
      stateUpdateSupplier - the instance of ProcessorSupplier
      Returns:
      itself
      Throws:
      TopologyException - if the processor of state is already registered
    • addGlobalStore

      public <KIn, VIn> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn,VIn> consumed, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)
      Adds a global StateStore to the topology. The StateStore sources its data from all partitions of the provided input topic. There will be exactly one instance of this StateStore per Kafka Streams instance.

      A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions of the input topic.

      The provided ProcessorSupplier} will be used to create an Processor that will receive all records forwarded from the SourceNode. The supplier should always generate a new instance. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() is a violation of the supplier pattern and leads to runtime exceptions. NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This Processor should be used to keep the StateStore up-to-date. The default TimestampExtractor as specified in the config is used.

      It is not required to connect a global store to the Processors, Transformers, or ValueTransformer; those have read-only access to all global stores by default.

      Parameters:
      storeBuilder - user defined StoreBuilder; can't be null
      topic - the topic to source the data from
      consumed - the instance of Consumed used to define optional parameters; can't be null
      stateUpdateSupplier - the instance of ProcessorSupplier
      Returns:
      itself
      Throws:
      TopologyException - if the processor of state is already registered
    • build

      public Topology build()
      Returns the Topology that represents the specified processing logic. Note that using this method means no optimizations are performed.
      Returns:
      the Topology that represents the specified processing logic
    • build

      public Topology build(Properties props)
      Returns the Topology that represents the specified processing logic and accepts a Properties instance used to indicate whether to optimize topology or not.
      Parameters:
      props - the Properties used for building possibly optimized topology
      Returns:
      the Topology that represents the specified processing logic