Interface KStream<K,V>

Type Parameters:
K - the key type of this stream
V - the value type of this stream

public interface KStream<K,V>
KStream is an abstraction of a record stream of key-value pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

A KStream is either defined from one or multiple Kafka topics that are consumed message by message or the result of a KStream transformation. A KTable can also be directly converted into a KStream.

A KStream can be transformed record by record, joined with another KStream, KTable, GlobalKTable, or can be aggregated into a KTable. A KStream can also be directly converted into a KTable. Kafka Streams DSL can be mixed-and-matched with the Processor API (PAPI) (cf. Topology) via process(...) and processValues(...).

  • Method Details

    • filter

      KStream<K,V> filter(Predicate<? super K,? super V> predicate)
      Create a new KStream that consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation (cf. processValues(FixedKeyProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      A KStream that contains only those records that satisfy the given predicate.
      See Also:
    • filter

      KStream<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
      See filter(Predicate).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • filterNot

      KStream<K,V> filterNot(Predicate<? super K,? super V> predicate)
      Create a new KStream that consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation (cf. processValues(FixedKeyProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      A KStream that contains only those records that do not satisfy the given predicate.
      See Also:
    • filterNot

      KStream<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
      See filterNot(Predicate).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • selectKey

      <KOut> KStream<KOut,V> selectKey(KeyValueMapper<? super K,? super V,? extends KOut> mapper)
      Create a new KStream that consists of all records of this stream but with a modified key. The provided KeyValueMapper is applied to each input record and computes a new key (possibly of a different type) for it. Thus, an input record <K,V> can be transformed into an output record <K':V>. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      For example, you can use this transformation to set a key for a key-less input record <null,V> by extracting a key from the value within your KeyValueMapper. The example below computes the new key as the length of the value string.

      
       KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
       KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
           Integer apply(Byte[] key, String value) {
               return value.length();
           }
       });
       
      Setting a new key might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result KStream.
      Type Parameters:
      KOut - the new key type of the result KStream
      Parameters:
      mapper - a KeyValueMapper that computes a new key for each input record
      Returns:
      A KStream that contains records with new key (possibly of a different type) and unmodified value.
      See Also:
    • selectKey

      <KOut> KStream<KOut,V> selectKey(KeyValueMapper<? super K,? super V,? extends KOut> mapper, Named named)
      See selectKey(KeyValueMapper).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • mapValues

      <VOut> KStream<K,VOut> mapValues(ValueMapper<? super V,? extends VOut> mapper)
      Create a new KStream that consists of all records of this stream but with a modified value. The provided ValueMapper is applied to each input record value and computes a new value (possibly of a different type) for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. If you need read access to the input record key, use mapValues(ValueMapperWithKey). This is a stateless record-by-record operation (cf. processValues(FixedKeyProcessorSupplier, String...) for stateful value processing or if you need access to the record's timestamp, headers, or other metadata).

      The example below counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
           Integer apply(String value) {
               return value.split(" ").length;
           }
       });
       
      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the result KStream (cf. map(KeyValueMapper)).
      Type Parameters:
      VOut - the value type of the result stream
      Parameters:
      mapper - a ValueMapper that computes a new value for each input record
      Returns:
      A KStream that contains records with unmodified key and new values (possibly of a different type).
      See Also:
    • mapValues

      <VOut> KStream<K,VOut> mapValues(ValueMapper<? super V,? extends VOut> mapper, Named named)
      See mapValues(ValueMapper).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • mapValues

      <VOut> KStream<K,VOut> mapValues(ValueMapperWithKey<? super K,? super V,? extends VOut> mapper)
      See mapValues(ValueMapper).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • mapValues

      <VOut> KStream<K,VOut> mapValues(ValueMapperWithKey<? super K,? super V,? extends VOut> mapper, Named named)
      See mapValues(ValueMapperWithKey).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • map

      <KOut, VOut> KStream<KOut,VOut> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KOut,? extends VOut>> mapper)
      Create a new KStream that consists of a modified record for each record in this stream. The provided KeyValueMapper is applied to each input record and computes a new output record (possibly of a different key and/or value type) for it. Thus, an input record <K,V> can be transformed into an output record <K':V'>. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      The example below normalizes the String key to upper-case letters and counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
           KeyValue<String, Integer> apply(String key, String value) {
               return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
           }
       });
       
      The provided KeyValueMapper must return a KeyValue type and must not return null.

      Mapping records might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result KStream (cf. mapValues(ValueMapper)).

      Type Parameters:
      KOut - the key type of the result stream
      VOut - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new KeyValue pair for each input record
      Returns:
      A KStream that contains records with new key and new value (possibly of different types).
      See Also:
    • map

      <KOut, VOut> KStream<KOut,VOut> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KOut,? extends VOut>> mapper, Named named)
      See map(KeyValueMapper).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • flatMap

      <KOut, VOut> KStream<KOut,VOut> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KOut,? extends VOut>>> mapper)
      Create a new KStream that consists of zero or more records for each record in this stream. The provided KeyValueMapper is applied to each input record and computes zero or more output records (possibly of a different key and/or value type) for it. Thus, an input record <K,V> can be transformed into output records <K':V'>, <K':V'>, .... This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      The example below splits input records <null:String> containing sentences as values into their words and emit a record <word:1> for each word.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.flatMap(
           new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
               Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
                   String[] tokens = value.split(" ");
                   List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
      
                   for(String token : tokens) {
                       result.add(new KeyValue<>(token, 1));
                   }
      
                   return result;
               }
           });
       
      The provided KeyValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Flat-mapping records might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMapValues(ValueMapper))

      Type Parameters:
      KOut - the key type of the result stream
      VOut - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper<K, V, Iterable<KeyValue<K', V'>>> that computes zero of more new KeyValue pairs for each input record
      Returns:
      A KStream that contains more or fewer records with new keys and values (possibly of different types).
      See Also:
    • flatMap

      <KR, VOut> KStream<KR,VOut> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VOut>>> mapper, Named named)
      See flatMap(KeyValueMapper).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • flatMapValues

      <VOut> KStream<K,VOut> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VOut>> mapper)
      Create a new KStream that consists of zero or more records with modified value for each record in this stream. The provided ValueMapper is applied to each input record value and computes zero or more output values (possibly of a different type) for it. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V'>, .... If you need read access to the input record key, use flatMapValues(ValueMapperWithKey). This is a stateless record-by-record operation (cf. processValues(FixedKeyProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      The example below splits input records <null:String> containing sentences as values into their words.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
           Iterable<String> apply(String value) {
               return Arrays.asList(value.split(" "));
           }
       });
       
      The provided ValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the result KStream (cf. flatMap(KeyValueMapper)).

      Type Parameters:
      VOut - the value type of the result stream
      Parameters:
      mapper - a ValueMapper<V, Iterable<V>> that computes zero or more new values for each input record
      Returns:
      A KStream that contains more or fewer records with unmodified keys but new values (possibly of a different type).
      See Also:
    • flatMapValues

      <VOut> KStream<K,VOut> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VOut>> mapper, Named named)
      See flatMapValues(ValueMapper).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • flatMapValues

      <VOut> KStream<K,VOut> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VOut>> mapper)
      See flatMapValues(ValueMapper).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • flatMapValues

      <VOut> KStream<K,VOut> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VOut>> mapper, Named named)
      See flatMapValues(ValueMapperWithKey).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • print

      void print(Printed<K,V> printed)
      Print the records of this KStream using the options provided by Printed. Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print. It SHOULD NOT be used for production usage if performance requirements are concerned.
      Parameters:
      printed - options for printing
    • foreach

      void foreach(ForeachAction<? super K,? super V> action)
      Perform an action on each record of this KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      Foreach is a terminal operation that may triggers side effects (such as logging or statistics collection) and returns void (cf. peek(ForeachAction)).

      Note that this operation may execute multiple times for a single record in failure cases, and it is not guarded by "exactly-once processing guarantees".

      Parameters:
      action - an action to perform on each record
    • foreach

      void foreach(ForeachAction<? super K,? super V> action, Named named)
      See foreach(ForeachAction).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • peek

      KStream<K,V> peek(ForeachAction<? super K,? super V> action)
      Perform an action on each record of this KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...) for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).

      Peek is a non-terminal operation that may triggers side effects (such as logging or statistics collection) and returns an unchanged KStream (cf. foreach(ForeachAction)).

      Note that this operation may execute multiple times for a single record in failure cases, and it is not guarded by "exactly-once processing guarantees".

      Parameters:
      action - an action to perform on each record
      Returns:
      An unmodified KStream.
    • peek

      KStream<K,V> peek(ForeachAction<? super K,? super V> action, Named named)
      See peek(ForeachAction).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • split

      BranchedKStream<K,V> split()
      Split this KStream into different branches. The returned BranchedKStream instance can be used for routing the records to different branches depending on evaluation against the supplied predicates. Records are evaluated against the predicates in the order they are provided with the first matching predicate accepting the record. Branching is a stateless record-by-record operation. See BranchedKStream for a detailed description and usage example.

      Splitting a KStream guarantees that each input record is sent to at most one result KStream. There is no operator for broadcasting/multicasting records into multiple result KStream. If you want to broadcast records, you can apply multiple downstream operators to the same KStream instance:

      
       // Broadcasting: every record of `stream` is sent to all three operators for processing
       KStream<...> stream1 = stream.map(...);
       KStream<...> stream2 = stream.mapValue(...);
       KStream<...> stream3 = stream.flatMap(...);
       
      Multicasting can be achieved with broadcasting into multiple filter operations:
      
       // Multicasting: every record of `stream` is sent to all three filters, and thus, may be part of
       // multiple result streams, `stream1`, `stream2`, and/or `stream3`
       KStream<...> stream1 = stream.filter(predicate1);
       KStream<...> stream2 = stream.filter(predicate2);
       KStream<...> stream3 = stream.filter(predicate3);
       
      Returns:
      A BranchedKStream that provides methods for routing the records to different branches.
      See Also:
    • split

      BranchedKStream<K,V> split(Named named)
      See split().

      Takes an additional Named parameter that is used to name the processor in the topology.

    • merge

      KStream<K,V> merge(KStream<K,V> otherStream)
      Merge this KStream and the given KStream.

      There is no ordering guarantee between records from this KStream and records from the provided KStream in the merged stream. Relative order is preserved within each input stream though (i.e., records within one input stream are processed in order).

      Parameters:
      otherStream - a stream which is to be merged into this stream
      Returns:
      A merged stream containing all records from this and the provided KStream
      See Also:
    • merge

      KStream<K,V> merge(KStream<K,V> otherStream, Named named)
      See merge(KStream).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • repartition

      KStream<K,V> repartition()
      Materialize this stream to an auto-generated repartition topic and create a new KStream from the auto-generated topic.

      The created topic is considered an internal topic and is meant to be used only by the current Kafka Streams instance. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.

      You can retrieve all generated internal topic names via Topology.describe(). To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, or to customize the name of the repartition topic, use repartition(Repartitioned).

      Returns:
      A KStream that contains the exact same, but repartitioned records as this KStream.
    • repartition

      KStream<K,V> repartition(Repartitioned<K,V> repartitioned)
    • to

      void to(String topic)
      Materialize this stream to a topic. The topic should be manually created before it is used (i.e., before the Kafka Streams application is started).

      To explicitly set key/value serdes or the partitioning strategy, use to(String, Produced).

      Parameters:
      topic - the output topic name
      See Also:
    • to

      void to(String topic, Produced<K,V> produced)
    • to

      void to(TopicNameExtractor<K,V> topicExtractor)
      Materialize the record of this stream to different topics. The provided TopicNameExtractor is applied to each input record to compute the output topic name. All topics should be manually created before they are used (i.e., before the Kafka Streams application is started).

      To explicitly set key/value serdes or the partitioning strategy, use to(TopicNameExtractor, Produced).

      Parameters:
      topicExtractor - the extractor to determine the name of the Kafka topic to write to for each record
      See Also:
    • to

      void to(TopicNameExtractor<K,V> topicExtractor, Produced<K,V> produced)
    • toTable

      KTable<K,V> toTable()
      Convert this stream to a KTable. The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now (cf. KStream vs KTable). The resulting KTable is essentially a changelog stream. To "upsert" the records of this stream into a materialized KTable (i.e., into a state store), use toTable(Materialized).

      Note that null keys are not supported by KTables and records with null key will be dropped.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or process(ProcessorSupplier, String...)) Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that the resulting KTable is correctly partitioned by its key.

      This internal repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.

      Note: If the result KTable is materialized, it is not possible to apply "source topic optimization", because repartition topics are considered transient and don't allow to recover the result KTable in case of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.

      You can retrieve all generated internal topic names via Topology.describe(). To customize the name of the repartition topic, use toTable(Named). For more control over the repartitioning, use repartition(Repartitioned) before toTable().

      Returns:
      A KTable that contains the same records as this KStream.
    • toTable

      KTable<K,V> toTable(Named named)
    • toTable

      KTable<K,V> toTable(Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
    • toTable

      KTable<K,V> toTable(Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
    • groupByKey

      KGroupedStream<K,V> groupByKey()
      Group the records by their current key into a KGroupedStream while preserving the original values. KGroupedStream can be further grouped with other streams to form a CogroupedKStream. (Co-)Grouping a stream on the record key is required before a windowing or aggregation operator can be applied to the data (cf. KGroupedStream). By default, the current key is used as grouping key, but a new grouping key can be set via groupBy(KeyValueMapper). In either case, if the grouping key is null, the record will be dropped.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or process(ProcessorSupplier, String...)) Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that the resulting KGroupedStream is correctly partitioned by the grouping key, before the downstream windowing/aggregation will be applied.

      This internal repartition topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.

      You can retrieve all generated internal topic names via Topology.describe(). To explicitly set key/value serdes or to customize the name of the repartition topic, use groupByKey(Grouped). For more control over the repartitioning, use repartition(Repartitioned) before groupByKey().

      Returns:
      A KGroupedStream that contains the grouped records of the original KStream.
    • groupByKey

      KGroupedStream<K,V> groupByKey(Grouped<K,V> grouped)
      See groupByKey().

      Takes an additional Grouped parameter, that allows to explicitly set key/value serdes or to customize the name of the potentially created internal repartition topic.

    • groupBy

      <KOut> KGroupedStream<KOut,V> groupBy(KeyValueMapper<? super K,? super V,KOut> keySelector)
      Group the records of this KStream on a new key (in contrast to groupByKey()). This operation is semantically equivalent to selectKey(KeyValueMapper) followed by groupByKey().

      Because a new key is selected, an internal repartitioning topic will be created in Kafka. See groupByKey() for more details about auto-repartitioning.

      Type Parameters:
      KOut - the new key type of the result KGroupedStream
      Parameters:
      keySelector - a KeyValueMapper that computes a new key for grouping
    • groupBy

      <KOut> KGroupedStream<KOut,V> groupBy(KeyValueMapper<? super K,? super V,KOut> keySelector, Grouped<KOut,V> grouped)
      See groupBy(KeyValueMapper).

      Takes an additional Grouped parameter, that allows to explicitly set key/value serdes or to customize the name of the created internal repartition topic.

    • join

      <VRight, VOut> KStream<K,VOut> join(KStream<K,VRight> rightStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      Join records of this (left) stream with another (right) KStream's records using a windowed inner equi-join. The join is computed using the records' key as join attribute, i.e., leftRecord.key == rightRight.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If you need read access to the join key, use join(KStream, ValueJoinerWithKey, JoinWindows). If an input record's key or value is null the input record will be dropped, and no join computation is triggered. Similarly, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.

      Example (assuming all input records belong to the correct windows):

      left right result
      <K1:A>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      Both KStreams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case (and if not auto-repartitioning happens, see further below), you would need to call repartition(Repartitioned) (for at least one of the two KStreams) before doing the join and specify the matching number of partitions via Repartitioned parameter to align the partition count for both inputs to each other. Furthermore, both KStreams need to be co-partitioned on the join key (i.e., use the same partitioner). Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure that the same partitioner is used for both inputs for the join.

      If a key changing operator was used before this operation on either input stream (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or process(ProcessorSupplier, String...)) Kafka Streams will automatically repartition the data of the corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that data is correctly partitioned by the join key.

      The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic(s) is determined based on the partition numbers of both upstream topics, and Kafka Streams will automatically align the number of partitions if required for co-partitioning. Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged by Kafka Streams.

      Both of the joined KStreams will be materialized in local state stores. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe(). To explicitly set key/value serdes, to customize the names of the repartition and changelog topic, or to customize the used state store, use join(KStream, ValueJoiner, JoinWindows, StreamJoined). For more control over the repartitioning, use repartition(Repartitioned) on eiter input before join().

      Type Parameters:
      VRight - the value type of the right stream
      VOut - the value type of the result stream
      Parameters:
      rightStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      A KStream that contains join-records, one for each matched record-pair, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • join

      <VRight, VOut> KStream<K,VOut> join(KStream<K,VRight> rightStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      See join(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • join

      <VRight, VOut> KStream<K,VOut> join(KStream<K,VRight> rightStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
    • join

      <VRight, VOut> KStream<K,VOut> join(KStream<K,VRight> rightStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
      See join(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • leftJoin

      <VRight, VOut> KStream<K,VOut> leftJoin(KStream<K,VRight> rightStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      Join records of this (left) stream with another (right) KStream's records using a windowed left equi-join. In contrast to an inner join, all records from this stream will produce at least one output record (more details below). The join is computed using the records' key as join attribute, i.e., leftRecord.key == rightRight.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Furthermore, for each input record of this KStream that does not have any join-partner in the right stream (i.e., no record with the same key within the join interval), ValueJoiner will be called with a null value for the right stream.

      Note: By default, non-joining records from this stream are buffered until their join window closes, and corresponding left-join results for these records are emitted with some delay. If you want to get left-join results without any delay, you can use JoinWindows#of(Duration) [deprecated] instead. However, such an "eager" left-join result could be a spurious result, because the same record may find actual join partners later, producing additional inner-join results.

      The key of the result record is the same as for both joining input records, or the left input record's key for a left-join result. If you need read access to the join key, use leftJoin(KStream, ValueJoinerWithKey, JoinWindows). If a left input record's value is null the input record will be dropped, and no join computation is triggered. Note, that for left input records, null keys are supported (in contrast to inner join), resulting in a left join result. If a right input record's key or value is null the input record will be dropped, and no join computation is triggered. For input record of either side, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.

      Example (assuming all input records belong to the correct windows, not taking actual emit/window-close time for left-join results, or eager/spurious results into account):

      left right result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      For more details, about co-partitioning requirements, (auto-)repartitioning, and more see join(KStream, ValueJoiner, JoinWindows).
      Returns:
      A KStream that contains join-records, one for each matched record-pair plus one for each non-matching record of this KStream, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • leftJoin

      <VRight, VOut> KStream<K,VOut> leftJoin(KStream<K,VRight> rightStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      See leftJoin(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • leftJoin

      <VRight, VOut> KStream<K,VOut> leftJoin(KStream<K,VRight> rightStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
    • leftJoin

      <VRight, VOut> KStream<K,VOut> leftJoin(KStream<K,VRight> rightStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
      See leftJoin(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • outerJoin

      <VRight, VOut> KStream<K,VOut> outerJoin(KStream<K,VRight> otherStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      Join records of this (left) stream with another (right) KStream's records using a windowed outer equi-join. In contrast to an inner join or left join, all records from both stream will produce at least one output record (more details below). The join is computed using the records' key as join attribute, i.e., leftRecord.key == rightRight.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Furthermore, for each input record of either KStream that does not have any join-partner in the other stream (i.e., no record with the same key within the join interval), ValueJoiner will be called with a null value for right/left stream, respectively.

      Note: By default, non-joining records from either stream are buffered until their join window closes, and corresponding outer-join results for these records are emitted with some delay. If you want to get outer-join results without any delay, you can use JoinWindows#of(Duration) [deprecated] instead. However, such an "eager" outer-join result could be a spurious result, because the same record may find actual join partners later, producing additional inner-join results.

      The key of the result record is the same as for both joining input records, or the left/right input record's key for an outer-join result, respectively. If you need read access to the join key, use outerJoin(KStream, ValueJoinerWithKey, JoinWindows). If an input record's value is null the input record will be dropped, and no join computation is triggered. Note, that input records with null keys are supported (in contrast to inner join), resulting in left/right join results. For input record of either side, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.

      Example (assuming all input records belong to the correct windows, not taking actual emit/window-close time for outer-join result, or eager/spurious results into account):

      left right result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c> <K3:ValueJoiner(null,c)>
      For more details, about co-partitioning requirements, (auto-)repartitioning, and more see join(KStream, ValueJoiner, JoinWindows).
      Returns:
      A KStream that contains join-records, one for each matched record-pair plus one for each non-matching record of either input KStream, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • outerJoin

      <VRight, VOut> KStream<K,VOut> outerJoin(KStream<K,VRight> otherStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows)
      See outerJoin(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.

    • outerJoin

      <VRight, VOut> KStream<K,VOut> outerJoin(KStream<K,VRight> otherStream, ValueJoiner<? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
    • outerJoin

      <VRight, VOut> KStream<K,VOut> outerJoin(KStream<K,VRight> otherStream, ValueJoinerWithKey<? super K,? super V,? super VRight,? extends VOut> joiner, JoinWindows windows, StreamJoined<K,V,VRight> streamJoined)
      See outerJoin(KStream, ValueJoiner, JoinWindows).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.

    • join

      <TableValue, VOut> KStream<K,VOut> join(KTable<K,TableValue> table, ValueJoiner<? super V,? super TableValue,? extends VOut> joiner)
      Join records of this stream with KTable's records using non-windowed inner equi-join. The join is a primary key table lookup join with join attribute streamRecord.key == tableRecord.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records into the internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record that finds a joining record in the KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If you need read access to the join key, use join(KTable, ValueJoinerWithKey). If a KStream input record's key or value is null the input record will be dropped, and no join computation is triggered. If a KTable input record's key is null the input record will be dropped, and the table state won't be updated. KTable input records with null values are considered deletes (so-called tombstone) for the table.

      Example:

      KStream KTable state result
      <K1:A>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      By default, KStream records are processed by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. This default implementation does not handle out-of-order records in either input of the join well. See join(KTable, ValueJoiner, Joined) on how to configure a stream-table join to handle out-of-order data.

      KStream and KTable (or to be more precise, their underlying source topics) need to have the same number of partitions (cf. join(GlobalKTable, KeyValueMapper, ValueJoiner)). If this is not the case (and if no auto-repartitioning happens for the KStream, see further below), you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, KStream and KTable need to be co-partitioned on the join key (i.e., use the same partitioner). Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure that the same partitioner is used for both inputs of the join.

      If a key changing operator was used on this KStream before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or process(ProcessorSupplier, String...)) Kafka Streams will automatically repartition the data of this KStream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that data is correctly partitioned by the KTable's key.

      The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on number of partitions of the KTable. Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged by Kafka Streams.

      You can retrieve all generated internal topic names via Topology.describe(). To explicitly set key/value serdes or to customize the names of the repartition topic, use join(KTable, ValueJoiner, Joined). For more control over the repartitioning, use repartition(Repartitioned) before join().

      Type Parameters:
      TableValue - the value type of the table
      VOut - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      A KStream that contains join-records, one for each matched stream record, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • join

      <TableValue, VOut> KStream<K,VOut> join(KTable<K,TableValue> table, ValueJoinerWithKey<? super K,? super V,? super TableValue,? extends VOut> joiner)
      See join(KTable, ValueJoiner).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • join

      <TableValue, VOut> KStream<K,VOut> join(KTable<K,TableValue> table, ValueJoiner<? super V,? super TableValue,? extends VOut> joiner, Joined<K,V,TableValue> joined)
      Join records of this stream with KTable's records using non-windowed inner equi-join. In contrast to join(KTable, ValueJoiner), but only if the used KTable is backed by a VersionedKeyValueStore, the additional Joined parameter allows to specify a join grace-period, to handle out-of-order data gracefully.

      For details about stream-table semantics, including co-partitioning requirements, (auto-)repartitioning, and more see join(KTable, ValueJoiner). If you specify a grace-period to handle out-of-order data, see further details below.

      To handle out-of-order records, the input KTable must use a VersionedKeyValueStore (specified via a Materialized parameter when the KTable is created), and a join grace-period must be specified. For this case, KStream records are buffered until the end of the grace period and the KTable lookup is performed with some delay. Given that the KTable state is versioned, the lookup can use "event time", allowing out-of-order KStream records, to join to the right (older) version of a KTable record with the same key. Also, KTable out-of-order updates are handled correctly by the versioned state store. Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp smaller than the defined grace-period allows; these late records will be dropped, and not join computation is triggered. Using a versioned state store for the KTable also implies that the defined history retention provides a cut-off point, and late records will be dropped, not updating the KTable state.

      If a join grace-period is specified, the KStream will be materialized in a local state store. For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe(). To customize the name of the changelog topic, use Joined input parameter.

    • join

      <TableValue, VOut> KStream<K,VOut> join(KTable<K,TableValue> table, ValueJoinerWithKey<? super K,? super V,? super TableValue,? extends VOut> joiner, Joined<K,V,TableValue> joined)
      See join(KTable, ValueJoiner, Joined).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • leftJoin

      <VTable, VOut> KStream<K,VOut> leftJoin(KTable<K,VTable> table, ValueJoiner<? super V,? super VTable,? extends VOut> joiner)
      Join records of this stream with KTable's records using non-windowed left equi-join. In contrast to an inner join, all records from this stream will produce an output record (more details below). The join is a primary key table lookup join with join attribute streamRecord.key == tableRecord.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records into the internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record, regardless if it finds a joining record in the KTable, the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. If no KTable record with matching key was found during the lookup, ValueJoiner will be called with a null value for the table record. The key of the result record is the same as for both joining input records, or the KStreams input record's key for a left-join result. If you need read access to the join key, use leftJoin(KTable, ValueJoinerWithKey). If a KStream input record's value is null the input record will be dropped, and no join computation is triggered. Note, that null keys for KStream input records are supported (in contrast to inner join) resulting in a left join result. If a KTable input record's key is null the input record will be dropped, and the table state won't be updated. KTable input records with null values are considered deletes (so-called tombstone) for the table.

      Example:

      KStream KTable state result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      By default, KStream records are processed by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. This default implementation does not handle out-of-order records in either input of the join well. See leftJoin(KTable, ValueJoiner, Joined) on how to configure a stream-table join to handle out-of-order data.

      For more details, about co-partitioning requirements, (auto-)repartitioning, and more see join(KStream, ValueJoiner, JoinWindows).

      Returns:
      A KStream that contains join-records, one for each matched stream record plus one for each non-matching stream record, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • leftJoin

      <VTable, VOut> KStream<K,VOut> leftJoin(KTable<K,VTable> table, ValueJoinerWithKey<? super K,? super V,? super VTable,? extends VOut> joiner)
      See leftJoin(KTable, ValueJoiner).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • leftJoin

      <VTable, VOut> KStream<K,VOut> leftJoin(KTable<K,VTable> table, ValueJoiner<? super V,? super VTable,? extends VOut> joiner, Joined<K,V,VTable> joined)
      Join records of this stream with KTable's records using non-windowed left equi-join. In contrast to leftJoin(KTable, ValueJoiner), but only if the used KTable is backed by a VersionedKeyValueStore, the additional Joined parameter allows to specify a join grace-period, to handle out-of-order data gracefully.

      For details about left-stream-table-join semantics see leftJoin(KTable, ValueJoiner). For co-partitioning requirements, (auto-)repartitioning, and more see join(KTable, ValueJoiner). If you specify a grace-period to handle out-of-order data, see join(KTable, ValueJoiner, Joined).

    • leftJoin

      <VTable, VOut> KStream<K,VOut> leftJoin(KTable<K,VTable> table, ValueJoinerWithKey<? super K,? super V,? super VTable,? extends VOut> joiner, Joined<K,V,VTable> joined)
      See leftJoin(KTable, ValueJoiner, Joined).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • join

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> join(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoiner<? super V,? super GlobalValue,? extends VOut> joiner)
      Join records of this stream with GlobalKTable's records using non-windowed inner equi-join. The join is a primary key table lookup join with join attribute keyValueMapper.map(streamRecord) == tableRecord.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record that finds a joining record in the GlobalKTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the stream record's key. If you need read access to the KStream key, use join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If a KStream input record's value is null or if the provided keySelector returns null, the input record will be dropped, and no join computation is triggered. If a GlobalKTable input record's key is null the input record will be dropped, and the table state won't be updated. GlobalKTable input records with null values are considered deletes (so-called tombstone) for the table.

      Example, using the first value attribute as join key:

      KStream GlobalKTable state result
      <K1:(GK1,A)>
      <GK1:b> <GK1:b>
      <K1:(GK1,C)> <GK1:b> <K1:ValueJoiner((GK1,C),b)>
      In contrast to join(KTable, ValueJoiner), there is no co-partitioning requirement between this KStream and the GlobalKTable. Also note that there are no ordering guarantees between the updates on the left and the right side of this join, since updates to the GlobalKTable are in no way synchronized. Therefore, the result of the join is inherently non-deterministic.
      Type Parameters:
      GlobalKey - the key type of the global table
      GlobalValue - the value type of the global table
      VOut - the value type of the result stream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - a KeyValueMapper that computes the join key for stream input records
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      A KStream that contains join-records, one for each matched stream record, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • join

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> join(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K,? super V,? super GlobalValue,? extends VOut> joiner)
      See join(GlobalKTable, KeyValueMapper, ValueJoiner).

      Note that the KStream key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • join

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> join(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoiner<? super V,? super GlobalValue,? extends VOut> joiner, Named named)
      See join(GlobalKTable, KeyValueMapper, ValueJoiner).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • join

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> join(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K,? super V,? super GlobalValue,? extends VOut> joiner, Named named)
      See join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • leftJoin

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> leftJoin(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoiner<? super V,? super GlobalValue,? extends VOut> joiner)
      Join records of this stream with GlobalKTable's records using non-windowed left equi-join. In contrast to an inner join, all records from this stream will produce an output record (more details below). The join is a primary key table lookup join with join attribute keyValueMapper.map(streamRecord) == tableRecord.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record, regardless if it finds a joining record in the GlobalKTable, the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. If no GlobalKTable record with matching key was found during the lookup, ValueJoiner will be called with a null value for the global table record. The key of the result record is the same as for both joining input records, or the KStreams input record's key for a left-join result. If you need read access to the KStream key, use leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If a KStream input record's value is null or if the provided keySelector returns null, the input record will be dropped, and no join computation is triggered. Note, that null keys for KStream input records are supported (in contrast to inner join) resulting in a left join result. If a GlobalKTable input record's key is null the input record will be dropped, and the table state won't be updated. GlobalKTable input records with null values are considered deletes (so-called tombstone) for the table.

      Example, using the first value attribute as join key:

      KStream GlobalKTable state result
      <K1:(GK1,A)> <K1:ValueJoiner((GK1,A),null)>
      <GK1:b> <GK1:b>
      <K1:(GK1,C)> <GK1:b> <K1:ValueJoiner((GK1,C),b)>
      In contrast to leftJoin(KTable, ValueJoiner), there is no co-partitioning requirement between this KStream and the GlobalKTable. Also note that there are no ordering guarantees between the updates on the left and the right side of this join, since updates to the GlobalKTable are in no way synchronized. Therefore, the result of the join is inherently non-deterministic.
      Type Parameters:
      GlobalKey - the key type of the global table
      GlobalValue - the value type of the global table
      VOut - the value type of the result stream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - a KeyValueMapper that computes the join key for stream input records
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      A KStream that contains join-records, one for each matched stream record plus one for each non-matching stream record, with the corresponding key and a value computed by the given ValueJoiner.
      See Also:
    • leftJoin

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> leftJoin(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K,? super V,? super GlobalValue,? extends VOut> joiner)
      See leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner).

      Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.

    • leftJoin

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> leftJoin(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoiner<? super V,? super GlobalValue,? extends VOut> joiner, Named named)
      See leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • leftJoin

      <GlobalKey, GlobalValue, VOut> KStream<K,VOut> leftJoin(GlobalKTable<GlobalKey,GlobalValue> globalTable, KeyValueMapper<? super K,? super V,? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K,? super V,? super GlobalValue,? extends VOut> joiner, Named named)
      See leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • process

      <KOut, VOut> KStream<KOut,VOut> process(ProcessorSupplier<? super K,? super V,? extends KOut,? extends VOut> processorSupplier, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a Processor (provided by the given ProcessorSupplier) to each input record. The Processor can emit any number of result records via ProcessorContext.forward(Record) (possibly of a different key and/or value type).

      By default, the processor is stateless (similar to flatMap(KeyValueMapper, Named), however, it also has access to the record's timestamp and headers), but previously added state stores can be connected by providing their names as additional parameters, making the processor stateful. There is two different state stores, which can be added to the underlying Topology:

      If the processorSupplier provides state stores via ConnectedStoreProvider.stores(), the corresponding StoreBuilders will be added to the topology and connected to this processor automatically, without the need to provide the store names as parameter to this method. Additionally, even if a processor is stateless, it can still access all global state stores (read-only). There is no need to connect global stores to processors.

      All state stores which are connected to a processor and all global stores, can be accessed via context.getStateStore(String) using the context provided via Processor#init():

      
       public class MyProcessor implements Processor<String, Integer, String, Integer> {
           private ProcessorContext<String, Integer> context;
           private KeyValueStore<String, String> store;
      
           @Override
           void init(final ProcessorContext<String, Integer> context) {
               this.context = context;
               this.store = context.getStateStore("myStore");
           }
      
           @Override
           void process(final Record<String, Integer> record) {
               // can access this.context and this.store
           }
       }
       
      Furthermore, the provided ProcessorContext gives access to topology, runtime, and record metadata, and allows to schedule punctuations and to request offset commits.

      In contrast to grouping/aggregation and joins, even if the processor is stateful and an upstream operation was key changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before process(). At the same time, this method is considered a key changing operation by itself, and might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result KStream (cf. processValues(FixedKeyProcessorSupplier, String...)).

      Parameters:
      processorSupplier - the supplier used to obtain Processor instances
      stateStoreNames - the names of state stores that the processor should be able to access
    • process

      <KOut, VOut> KStream<KOut,VOut> process(ProcessorSupplier<? super K,? super V,? extends KOut,? extends VOut> processorSupplier, Named named, String... stateStoreNames)
      See process(ProcessorSupplier, String...).

      Takes an additional Named parameter that is used to name the processor in the topology.

    • processValues

      <VOut> KStream<K,VOut> processValues(FixedKeyProcessorSupplier<? super K,? super V,? extends VOut> processorSupplier, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a FixedKeyProcessor (provided by the given FixedKeyProcessorSupplier) to each input record. This method is similar to process(ProcessorSupplier, String...), however the key of the input Record cannot be modified.

      Because the key cannot be modified, this method is not a key changing operation and preserves data co-location with respect to the key (cf. flatMapValues(ValueMapper)). Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the result KStream.

      However, because the key cannot be modified, some restrictions apply to a FixedKeyProcessor compared to a Processor: for example, forwarding result records from a Punctuator is not possible.

    • processValues

      <VOut> KStream<K,VOut> processValues(FixedKeyProcessorSupplier<? super K,? super V,? extends VOut> processorSupplier, Named named, String... stateStoreNames)
      See processValues(FixedKeyProcessorSupplier, String...).

      Takes an additional Named parameter that is used to name the processor in the topology.