Interface KStream<K,V>

Type Parameters:
K - Type of keys
V - Type of values

public interface KStream<K,V>
KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

A KStream is either defined from one or multiple Kafka topics that are consumed message by message or the result of a KStream transformation. A KTable can also be converted into a KStream.

A KStream can be transformed record by record, joined with another KStream, KTable, GlobalKTable, or can be aggregated into a KTable. Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. Topology) via process(...), transform(...), and transformValues(...).

See Also:
  • Method Details

    • filter

      KStream<K,V> filter(Predicate<? super K,? super V> predicate)
      Create a new KStream that consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation.
      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      a KStream that contains only those records that satisfy the given predicate
      See Also:
    • filter

      KStream<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
      Create a new KStream that consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation.
      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains only those records that satisfy the given predicate
      See Also:
    • filterNot

      KStream<K,V> filterNot(Predicate<? super K,? super V> predicate)
      Create a new KStream that consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation.
      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      a KStream that contains only those records that do not satisfy the given predicate
      See Also:
    • filterNot

      KStream<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
      Create a new KStream that consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation.
      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains only those records that do not satisfy the given predicate
      See Also:
    • selectKey

      <KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
      Set a new key (with possibly new type) for each input record. The provided KeyValueMapper is applied to each input record and computes a new key for it. Thus, an input record <K,V> can be transformed into an output record <K':V>. This is a stateless record-by-record operation.

      For example, you can use this transformation to set a key for a key-less input record <null,V> by extracting a key from the value within your KeyValueMapper. The example below computes the new key as the length of the value string.

      
       KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
       KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
           Integer apply(Byte[] key, String value) {
               return value.length();
           }
       });
       
      Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream.
      Type Parameters:
      KR - the new key type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new key for each record
      Returns:
      a KStream that contains records with new key (possibly of different type) and unmodified value
      See Also:
    • selectKey

      <KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
      Set a new key (with possibly new type) for each input record. The provided KeyValueMapper is applied to each input record and computes a new key for it. Thus, an input record <K,V> can be transformed into an output record <K':V>. This is a stateless record-by-record operation.

      For example, you can use this transformation to set a key for a key-less input record <null,V> by extracting a key from the value within your KeyValueMapper. The example below computes the new key as the length of the value string.

      
       KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
       KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
           Integer apply(Byte[] key, String value) {
               return value.length();
           }
       });
       
      Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream.
      Type Parameters:
      KR - the new key type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new key for each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains records with new key (possibly of different type) and unmodified value
      See Also:
    • map

      <KR, VR> KStream<KR,VR> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
      Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily). The provided KeyValueMapper is applied to each input record and computes a new output record. Thus, an input record <K,V> can be transformed into an output record <K':V'>. This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for stateful record transformation).

      The example below normalizes the String key to upper-case letters and counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
           KeyValue<String, Integer> apply(String key, String value) {
               return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
           }
       });
       
      The provided KeyValueMapper must return a KeyValue type and must not return null.

      Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. mapValues(ValueMapper))

      Type Parameters:
      KR - the key type of the result stream
      VR - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new output record
      Returns:
      a KStream that contains records with new key and value (possibly both of different type)
      See Also:
    • map

      <KR, VR> KStream<KR,VR> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper, Named named)
      Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily). The provided KeyValueMapper is applied to each input record and computes a new output record. Thus, an input record <K,V> can be transformed into an output record <K':V'>. This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for stateful record transformation).

      The example below normalizes the String key to upper-case letters and counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
           KeyValue<String, Integer> apply(String key, String value) {
               return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
           }
       });
       
      The provided KeyValueMapper must return a KeyValue type and must not return null.

      Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. mapValues(ValueMapper))

      Type Parameters:
      KR - the key type of the result stream
      VR - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new output record
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains records with new key and value (possibly both of different type)
      See Also:
    • mapValues

      <VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
      Transform the value of each input record into a new value (with possible new type) of the output record. The provided ValueMapper is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...) for stateful value transformation).

      The example below counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
           Integer apply(String value) {
               return value.split(" ").length;
           }
       });
       
      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. map(KeyValueMapper))
      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapper that computes a new output value
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • mapValues

      <VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
      Transform the value of each input record into a new value (with possible new type) of the output record. The provided ValueMapper is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...) for stateful value transformation).

      The example below counts the number of token of the value string.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
           Integer apply(String value) {
               return value.split(" ").length;
           }
       });
       
      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. map(KeyValueMapper))
      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapper that computes a new output value
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • mapValues

      <VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
      Transform the value of each input record into a new value (with possible new type) of the output record. The provided ValueMapperWithKey is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...) for stateful value transformation).

      The example below counts the number of tokens of key and value strings.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
           Integer apply(String readOnlyKey, String value) {
               return readOnlyKey.split(" ").length + value.split(" ").length;
           }
       });
       
      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. map(KeyValueMapper))
      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • mapValues

      <VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
      Transform the value of each input record into a new value (with possible new type) of the output record. The provided ValueMapperWithKey is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...) for stateful value transformation).

      The example below counts the number of tokens of key and value strings.

      
       KStream<String, String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
           Integer apply(String readOnlyKey, String value) {
               return readOnlyKey.split(" ").length + value.split(" ").length;
           }
       });
       
      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. map(KeyValueMapper))
      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • flatMap

      <KR, VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper)
      Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). The provided KeyValueMapper is applied to each input record and computes zero or more output records. Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, .... This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for stateful record transformation).

      The example below splits input records <null:String> containing sentences as values into their words and emit a record <word:1> for each word.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.flatMap(
           new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
               Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
                   String[] tokens = value.split(" ");
                   List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
      
                   for(String token : tokens) {
                       result.add(new KeyValue<>(token, 1));
                   }
      
                   return result;
               }
           });
       
      The provided KeyValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMapValues(ValueMapper))

      Type Parameters:
      KR - the key type of the result stream
      VR - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes the new output records
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • flatMap

      <KR, VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper, Named named)
      Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). The provided KeyValueMapper is applied to each input record and computes zero or more output records. Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, .... This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for stateful record transformation).

      The example below splits input records <null:String> containing sentences as values into their words and emit a record <word:1> for each word.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<String, Integer> outputStream = inputStream.flatMap(
           new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
               Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
                   String[] tokens = value.split(" ");
                   List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
      
                   for(String token : tokens) {
                       result.add(new KeyValue<>(token, 1));
                   }
      
                   return result;
               }
           });
       
      The provided KeyValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMapValues(ValueMapper))

      Type Parameters:
      KR - the key type of the result stream
      VR - the value type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes the new output records
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • flatMapValues

      <VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper)
      Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided ValueMapper is applied to each input record and computes zero or more output values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...) for stateful value transformation).

      The example below splits input records <null:String> containing sentences as values into their words.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
           Iterable<String> apply(String value) {
               return Arrays.asList(value.split(" "));
           }
       });
       
      The provided ValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMap(KeyValueMapper))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapper the computes the new output values
      Returns:
      a KStream that contains more or less records with unmodified keys and new values of different type
      See Also:
    • flatMapValues

      <VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper, Named named)
      Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided ValueMapper is applied to each input record and computes zero or more output values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...) for stateful value transformation).

      The example below splits input records <null:String> containing sentences as values into their words.

      
       KStream<byte[], String> inputStream = builder.stream("topic");
       KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
           Iterable<String> apply(String value) {
               return Arrays.asList(value.split(" "));
           }
       });
       
      The provided ValueMapper must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMap(KeyValueMapper))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapper the computes the new output values
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains more or less records with unmodified keys and new values of different type
      See Also:
    • flatMapValues

      <VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper)
      Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided ValueMapperWithKey is applied to each input record and computes zero or more output values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...) for stateful value transformation).

      The example below splits input records <Integer:String>, with key=1, containing sentences as values into their words.

      
       KStream<Integer, String> inputStream = builder.stream("topic");
       KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
           Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
               if(readOnlyKey == 1) {
                   return Arrays.asList(value.split(" "));
               } else {
                   return Arrays.asList(value);
               }
           }
       });
       
      The provided ValueMapperWithKey must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMap(KeyValueMapper))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapperWithKey the computes the new output values
      Returns:
      a KStream that contains more or less records with unmodified keys and new values of different type
      See Also:
    • flatMapValues

      <VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper, Named named)
      Create a new KStream by transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The provided ValueMapperWithKey is applied to each input record and computes zero or more output values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...) for stateful value transformation).

      The example below splits input records <Integer:String>, with key=1, containing sentences as values into their words.

      
       KStream<Integer, String> inputStream = builder.stream("topic");
       KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
           Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
               if(readOnlyKey == 1) {
                   return Arrays.asList(value.split(" "));
               } else {
                   return Arrays.asList(value);
               }
           }
       });
       
      The provided ValueMapperWithKey must return an Iterable (e.g., any Collection type) and the return value must not be null.

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatMap(KeyValueMapper))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      mapper - a ValueMapperWithKey the computes the new output values
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains more or less records with unmodified keys and new values of different type
      See Also:
    • print

      void print(Printed<K,V> printed)
      Print the records of this KStream using the options provided by Printed Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print. It SHOULD NOT be used for production usage if performance requirements are concerned.
      Parameters:
      printed - options for printing
    • foreach

      void foreach(ForeachAction<? super K,? super V> action)
      Perform an action on each record of KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)). Note that this is a terminal operation that returns void.
      Parameters:
      action - an action to perform on each record
      See Also:
    • foreach

      void foreach(ForeachAction<? super K,? super V> action, Named named)
      Perform an action on each record of KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)). Note that this is a terminal operation that returns void.
      Parameters:
      action - an action to perform on each record
      named - a Named config used to name the processor in the topology
      See Also:
    • peek

      KStream<K,V> peek(ForeachAction<? super K,? super V> action)
      Perform an action on each record of KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)).

      Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.

      Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.

      Parameters:
      action - an action to perform on each record
      Returns:
      itself
      See Also:
    • peek

      KStream<K,V> peek(ForeachAction<? super K,? super V> action, Named named)
      Perform an action on each record of KStream. This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)).

      Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.

      Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.

      Parameters:
      action - an action to perform on each record
      named - a Named config used to name the processor in the topology
      Returns:
      itself
      See Also:
    • branch

      @Deprecated KStream<K,V>[] branch(Predicate<? super K,? super V>... predicates)
      Deprecated.
      since 2.8. Use split() instead.
      Creates an array of KStream from this stream by branching the records in the original stream based on the supplied predicates. Each record is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and is assigned to this stream only. A record will be dropped if none of the predicates evaluate to true. This is a stateless record-by-record operation.
      Parameters:
      predicates - the ordered list of Predicate instances
      Returns:
      multiple distinct substreams of this KStream
    • branch

      @Deprecated KStream<K,V>[] branch(Named named, Predicate<? super K,? super V>... predicates)
      Deprecated.
      since 2.8. Use split(Named) instead.
      Creates an array of KStream from this stream by branching the records in the original stream based on the supplied predicates. Each record is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and is assigned to this stream only. A record will be dropped if none of the predicates evaluate to true. This is a stateless record-by-record operation.
      Parameters:
      named - a Named config used to name the processor in the topology
      predicates - the ordered list of Predicate instances
      Returns:
      multiple distinct substreams of this KStream
    • split

      BranchedKStream<K,V> split()
      Split this stream into different branches. The returned BranchedKStream instance can be used for routing the records to different branches depending on evaluation against the supplied predicates.

      Note: Stream branching is a stateless record-by-record operation. Please check BranchedKStream for detailed description and usage example

      Returns:
      BranchedKStream that provides methods for routing the records to different branches.
    • split

      BranchedKStream<K,V> split(Named named)
      Split this stream into different branches. The returned BranchedKStream instance can be used for routing the records to different branches depending on evaluation against the supplied predicates.

      Note: Stream branching is a stateless record-by-record operation. Please check BranchedKStream for detailed description and usage example

      Parameters:
      named - a Named config used to name the processor in the topology and also to set the name prefix for the resulting branches (see BranchedKStream)
      Returns:
      BranchedKStream that provides methods for routing the records to different branches.
    • merge

      KStream<K,V> merge(KStream<K,V> stream)
      Merge this stream and the given stream into one larger stream.

      There is no ordering guarantee between records from this KStream and records from the provided KStream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).

      Parameters:
      stream - a stream which is to be merged into this stream
      Returns:
      a merged stream containing all records from this and the provided KStream
    • merge

      KStream<K,V> merge(KStream<K,V> stream, Named named)
      Merge this stream and the given stream into one larger stream.

      There is no ordering guarantee between records from this KStream and records from the provided KStream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).

      Parameters:
      stream - a stream which is to be merged into this stream
      named - a Named config used to name the processor in the topology
      Returns:
      a merged stream containing all records from this and the provided KStream
    • through

      @Deprecated KStream<K,V> through(String topic)
      Deprecated.
      since 2.6; use repartition() instead
      Materialize this stream to a topic and creates a new KStream from the topic using default serializers, deserializers, and producer's DefaultPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).

      This is similar to calling #to(someTopicName) and StreamsBuilder#stream(someTopicName). Note that through() uses a hard coded timestamp extractor and does not allow to customize it, to ensure correct timestamp propagation.

      Parameters:
      topic - the topic name
      Returns:
      a KStream that contains the exact same (and potentially repartitioned) records as this KStream
    • through

      @Deprecated KStream<K,V> through(String topic, Produced<K,V> produced)
      Deprecated.
      since 2.6; use repartition(Repartitioned) instead
      Materialize this stream to a topic and creates a new KStream from the topic using the Produced instance for configuration of the key serde, value serde, and StreamPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).

      This is similar to calling to(someTopic, Produced.with(keySerde, valueSerde) and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)). Note that through() uses a hard coded timestamp extractor and does not allow to customize it, to ensure correct timestamp propagation.

      Parameters:
      topic - the topic name
      produced - the options to use when producing to the topic
      Returns:
      a KStream that contains the exact same (and potentially repartitioned) records as this KStream
    • repartition

      KStream<K,V> repartition()
      Materialize this stream to an auto-generated repartition topic and create a new KStream from the auto-generated topic using default serializers, deserializers, and producer's DefaultPartitioner. The number of partitions is determined based on the upstream topics partition numbers.

      The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Returns:
      KStream that contains the exact same repartitioned records as this KStream.
    • repartition

      KStream<K,V> repartition(Repartitioned<K,V> repartitioned)
      Materialize this stream to an auto-generated repartition topic and create a new KStream from the auto-generated topic using key serde, value serde, StreamPartitioner, number of partitions, and topic name part as defined by Repartitioned.

      The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is either provided via Repartitioned.as(String) or an internally generated name, and "-repartition" is a fixed suffix.

      Parameters:
      repartitioned - the Repartitioned instance used to specify Serdes, StreamPartitioner which determines how records are distributed among partitions of the topic, part of the topic name, and number of partitions for a repartition topic.
      Returns:
      a KStream that contains the exact same repartitioned records as this KStream.
    • to

      void to(String topic)
      Materialize this stream to a topic using default serializers specified in the config and producer's DefaultPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).
      Parameters:
      topic - the topic name
    • to

      void to(String topic, Produced<K,V> produced)
      Materialize this stream to a topic using the provided Produced instance. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).
      Parameters:
      topic - the topic name
      produced - the options to use when producing to the topic
    • to

      void to(TopicNameExtractor<K,V> topicExtractor)
      Dynamically materialize this stream to topics using default serializers specified in the config and producer's DefaultPartitioner. The topic names for each record to send to is dynamically determined based on the TopicNameExtractor.
      Parameters:
      topicExtractor - the extractor to determine the name of the Kafka topic to write to for each record
    • to

      void to(TopicNameExtractor<K,V> topicExtractor, Produced<K,V> produced)
      Dynamically materialize this stream to topics using the provided Produced instance. The topic names for each record to send to is dynamically determined based on the TopicNameExtractor.
      Parameters:
      topicExtractor - the extractor to determine the name of the Kafka topic to write to for each record
      produced - the options to use when producing to the topic
    • toTable

      KTable<K,V> toTable()
      Convert this stream to a KTable.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KTable is partitioned correctly on its key. Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG config for this case, because repartition topics are considered transient and don't allow to recover the result KTable in cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf. KStream vs KTable).

      Returns:
      a KTable that contains the same records as this KStream
    • toTable

      KTable<K,V> toTable(Named named)
      Convert this stream to a KTable.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KTable is partitioned correctly on its key. Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG config for this case, because repartition topics are considered transient and don't allow to recover the result KTable in cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf. KStream vs KTable).

      Parameters:
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains the same records as this KStream
    • toTable

      KTable<K,V> toTable(Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Convert this stream to a KTable.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KTable is partitioned correctly on its key. Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG config for this case, because repartition topics are considered transient and don't allow to recover the result KTable in cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf. KStream vs KTable).

      Parameters:
      materialized - an instance of Materialized used to describe how the state store of the resulting table should be materialized.
      Returns:
      a KTable that contains the same records as this KStream
    • toTable

      KTable<K,V> toTable(Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Convert this stream to a KTable.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KTable is partitioned correctly on its key. Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG config for this case, because repartition topics are considered transient and don't allow to recover the result KTable in cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf. KStream vs KTable).

      Parameters:
      named - a Named config used to name the processor in the topology
      materialized - an instance of Materialized used to describe how the state store of the resulting table should be materialized.
      Returns:
      a KTable that contains the same records as this KStream
    • groupBy

      <KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector)
      Group the records of this KStream on a new key that is selected using the provided KeyValueMapper and default serializers and deserializers. KGroupedStream can be further grouped with other streams to form a CogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf. KGroupedStream). The KeyValueMapper selects a new key (which may or may not be of the same type) while preserving the original values. If the new record key is null the record will not be included in the resulting KGroupedStream

      Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      All data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned on the new key.

      This operation is equivalent to calling selectKey(KeyValueMapper) followed by groupByKey(). If the key type is changed, it is recommended to use groupBy(KeyValueMapper, Grouped) instead.

      Type Parameters:
      KR - the key type of the result KGroupedStream
      Parameters:
      keySelector - a KeyValueMapper that computes a new key for grouping
      Returns:
      a KGroupedStream that contains the grouped records of the original KStream
    • groupBy

      <KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Grouped<KR,V> grouped)
      Group the records of this KStream on a new key that is selected using the provided KeyValueMapper and Serdes as specified by Grouped. KGroupedStream can be further grouped with other streams to form a CogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf. KGroupedStream). The KeyValueMapper selects a new key (which may or may not be of the same type) while preserving the original values. If the new record key is null the record will not be included in the resulting KGroupedStream.

      Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is either provided via Grouped.as(String) or an internally generated name.

      You can retrieve all generated internal topic names via Topology.describe().

      All data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned on the new key.

      This operation is equivalent to calling selectKey(KeyValueMapper) followed by groupByKey().

      Type Parameters:
      KR - the key type of the result KGroupedStream
      Parameters:
      keySelector - a KeyValueMapper that computes a new key for grouping
      grouped - the Grouped instance used to specify Serdes and part of the name for a repartition topic if repartitioning is required.
      Returns:
      a KGroupedStream that contains the grouped records of the original KStream
    • groupByKey

      KGroupedStream<K,V> groupByKey()
      Group the records by their current key into a KGroupedStream while preserving the original values and default serializers and deserializers. KGroupedStream can be further grouped with other streams to form a CogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf. KGroupedStream). If a record key is null the record will not be included in the resulting KGroupedStream.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned correctly on its key. If the last key changing operator changed the key type, it is recommended to use groupByKey(org.apache.kafka.streams.kstream.Grouped) instead.

      Returns:
      a KGroupedStream that contains the grouped records of the original KStream
      See Also:
    • groupByKey

      KGroupedStream<K,V> groupByKey(Grouped<K,V> grouped)
      Group the records by their current key into a KGroupedStream while preserving the original values and using the serializers as defined by Grouped. KGroupedStream can be further grouped with other streams to form a CogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf. KGroupedStream). If a record key is null the record will not be included in the resulting KGroupedStream.

      If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper) or transform(TransformerSupplier, String...)) an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, <name> is either provided via Grouped.as(String) or an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned correctly on its key.

      Parameters:
      grouped - the Grouped instance used to specify Serdes and part of the name for a repartition topic if repartitioning is required.
      Returns:
      a KGroupedStream that contains the grouped records of the original KStream
      See Also:
    • join

      <VO, VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed inner equi join with default serializers and deserializers. The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key and within the joining window intervals
      See Also:
    • join

      <VO, VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed inner equi join with default serializers and deserializers. The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K1,B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key and within the joining window intervals
      See Also:
    • join

      <VO, VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed inner equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined used to configure join stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key and within the joining window intervals
      See Also:
    • join

      <VO, VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed inner equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K1,B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined used to configure join stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key and within the joining window intervals
      See Also:
    • leftJoin

      <VO, VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided ValueJoiner will be called with a null value for the other stream. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of this KStream and within the joining window intervals
      See Also:
    • leftJoin

      <VO, VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided ValueJoinerWithKey will be called with a null value for the other stream. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoinerWithKey(K1, A,null)>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2, B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of this KStream and within the joining window intervals
      See Also:
    • leftJoin

      <VO, VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed left equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. In contrast to inner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided ValueJoiner will be called with a null value for the other stream. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined instance to configure serdes and state stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of this KStream and within the joining window intervals
      See Also:
    • leftJoin

      <VO, VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed left equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. In contrast to inner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided ValueJoinerWithKey will be called with a null value for the other stream. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoinerWithKey(K1,A,null)>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,B,b)>
      <K3:c>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined instance to configure serdes and state stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of this KStream and within the joining window intervals
      See Also:
    • outerJoin

      <VO, VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed outer equi join with default serializers and deserializers. In contrast to inner-join or left-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided ValueJoiner will be called with a null value for the this/other stream, respectively. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(null,b)>

      <K2:ValueJoiner(B,b)>
      <K3:c> <K3:ValueJoiner(null,c)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KStream and within the joining window intervals
      See Also:
    • outerJoin

      <VO, VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows)
      Join records of this stream with another KStream's records using windowed outer equi join with default serializers and deserializers. In contrast to inner-join or left-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided ValueJoinerWithKey will be called with a null value for the this/other stream, respectively. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoinerWithKey(K1,A,null)>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,null,b)>

      <K2:ValueJoinerWithKey(K2,B,b)>
      <K3:c> <K3:ValueJoinerWithKey(K3,null,c)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of both KStream and within the joining window intervals
      See Also:
    • outerJoin

      <VO, VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed outer equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. In contrast to inner-join or left-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided ValueJoiner will be called with a null value for this/other stream, respectively. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K2:B> <K2:b> <K2:ValueJoiner(null,b)>

      <K2:ValueJoiner(B,b)>
      <K3:c> <K3:ValueJoiner(null,c)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined instance to configure serdes and state stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KStream and within the joining window intervals
      See Also:
    • outerJoin

      <VO, VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoinerWithKey<? super K,? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
      Join records of this stream with another KStream's records using windowed outer equi join using the StreamJoined instance for configuration of the key serde, this stream's value serde, the other stream's value serde, and used state stores. In contrast to inner-join or left-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the given JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.

      For each pair of records meeting both join predicates the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided ValueJoinerWithKey will be called with a null value for this/other stream, respectively. If an input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example (assuming all input records belong to the correct windows):

      this other result
      <K1:A> <K1:ValueJoinerWithKey(K1,A,null)>
      <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,null,b)>

      <K2:ValueJoinerWithKey(K2,B,b)>
      <K3:c> <K3:ValueJoinerWithKey(K3,null,c)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) (for one input stream) before doing the join and specify the "correct" number of partitions via Repartitioned parameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      Repartitioning can happen for one or both of the joining KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Both of the joining KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via a Materialized instance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Type Parameters:
      VO - the value type of the other stream
      VR - the value type of the result stream
      Parameters:
      otherStream - the KStream to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      windows - the specification of the JoinWindows
      streamJoined - a StreamJoined instance to configure serdes and state stores
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of both KStream and within the joining window intervals
      See Also:
    • join

      <VT, VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
      Join records of this stream with KTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • join

      <VT, VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoinerWithKey<? super K,? super V,? super VT,? extends VR> joiner)
      Join records of this stream with KTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in KTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key
      See Also:
    • join

      <VT, VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
      Join records of this stream with KTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      joined - a Joined instance that defines the serdes to be used to serialize/deserialize inputs of the joined streams
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • join

      <VT, VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoinerWithKey<? super K,? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
      Join records of this stream with KTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in KTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      joined - a Joined instance that defines the serdes to be used to serialize/deserialize inputs of the joined streams
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one for each matched record-pair with the same key
      See Also:
    • leftJoin

      <VT, VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
      Join records of this stream with KTable's records using non-windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. If no KTable record was found during lookup, a null value will be provided to ValueJoiner. The key of the result record is the same as for both joining input records. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • leftJoin

      <VT, VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoinerWithKey<? super K,? super V,? super VT,? extends VR> joiner)
      Join records of this stream with KTable's records using non-windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in KTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. If no KTable record was found during lookup, a null value will be provided to ValueJoinerWithKey. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A> <K1:ValueJoinerWithKey(K1,A,null)>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • leftJoin

      <VT, VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
      Join records of this stream with KTable's records using non-windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. If no KTable record was found during lookup, a null value will be provided to ValueJoiner. The key of the result record is the same as for both joining input records. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A> <K1:ValueJoiner(A,null)>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      joined - a Joined instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • leftJoin

      <VT, VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoinerWithKey<? super K,? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
      Join records of this stream with KTable's records using non-windowed left equi join with default serializers and deserializers. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute stream.key == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internal KTable state. In contrast, processing KTable input records will only update the internal KTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in KTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. If no KTable record was found during lookup, a null value will be provided to ValueJoinerWithKey. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If an KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream.

      Example:

      KStream KTable state result
      <K1:A> <K1:ValueJoinerWithKey(K1,A,null)>
      <K1:b> <K1:b>
      <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to call repartition(Repartitioned) for this KStream before doing the join, specifying the same number of partitions via Repartitioned parameter as the given KTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf. join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.

      You can retrieve all generated internal topic names via Topology.describe().

      Repartitioning can happen only for this KStream but not for the provided KTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join input KStream is partitioned correctly on its key.

      Type Parameters:
      VT - the value type of the table
      VR - the value type of the result stream
      Parameters:
      table - the KTable to be joined with this stream
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      joined - a Joined instance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • join

      <GK, GV, RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner)
      Join records of this stream with GlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in GlobalKTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of this KStream. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, no output record will be added to the resulting KStream.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • join

      <GK, GV, RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoinerWithKey<? super K,? super V,? super GV,? extends RV> joiner)
      Join records of this stream with GlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in GlobalKTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of this KStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, no output record will be added to the resulting KStream.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • join

      <GK, GV, RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner, Named named)
      Join records of this stream with GlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in GlobalKTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of this KStream. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, no output record will be added to the resulting KStream.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • join

      <GK, GV, RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoinerWithKey<? super K,? super V,? super GV,? extends RV> joiner, Named named)
      Join records of this stream with GlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record that finds a corresponding record in GlobalKTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of this KStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, no output record will be added to the resulting KStream.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      joiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • leftJoin

      <GK, GV, RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
      Join records of this stream with GlobalKTable's records using non-windowed left equi join. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in GlobalKTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as this KStream. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoiner. If no GlobalKTable record was found during lookup, a null value will be provided to ValueJoiner.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      valueJoiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • leftJoin

      <GK, GV, RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoinerWithKey<? super K,? super V,? super GV,? extends RV> valueJoiner)
      Join records of this stream with GlobalKTable's records using non-windowed left equi join. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in GlobalKTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as this KStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoinerWithKey. If no GlobalKTable record was found during lookup, a null value will be provided to ValueJoiner.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      valueJoiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • leftJoin

      <GK, GV, RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner, Named named)
      Join records of this stream with GlobalKTable's records using non-windowed left equi join. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in GlobalKTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as this KStream. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoiner. If no GlobalKTable record was found during lookup, a null value will be provided to ValueJoiner.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      valueJoiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoiner, one output for each input KStream record
      See Also:
    • leftJoin

      <GK, GV, RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoinerWithKey<? super K,? super V,? super GV,? extends RV> valueJoiner, Named named)
      Join records of this stream with GlobalKTable's records using non-windowed left equi join. In contrast to inner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attribute keyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed if KStream records are processed. This is done by performing a lookup for matching records in the current internal GlobalKTable state. In contrast, processing GlobalKTable input records will only update the internal GlobalKTable state and will not produce any result records.

      For each KStream record whether or not it finds a corresponding record in GlobalKTable the provided ValueJoinerWithKey will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as this KStream. If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoinerWithKey. If no GlobalKTable record was found during lookup, a null value will be provided to ValueJoinerWithKey.

      Type Parameters:
      GK - the key type of GlobalKTable
      GV - the value type of the GlobalKTable
      RV - the value type of the resulting KStream
      Parameters:
      globalTable - the GlobalKTable to be joined with this stream
      keySelector - instance of KeyValueMapper used to map from the (key, value) of this stream to the key of the GlobalKTable
      valueJoiner - a ValueJoinerWithKey that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains join-records for each key and values computed by the given ValueJoinerWithKey, one output for each input KStream record
      See Also:
    • transform

      <K1, V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
      Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and returns zero or one output record. Thus, an input record <K,V> can be transformed into an output record <K':V'>. Attaching a state store makes this a stateful record-by-record operation (cf. map()). If you choose not to attach one, this operation is similar to the stateless map() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator#punctuate(), the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transform(new TransformerSupplier() {
           public Transformer get() {
               return new MyTransformer();
           }
       }, "myTransformState");
       
      The second strategy is for the given TransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyTransformerSupplier implements TransformerSupplier {
           // supply transformer
           Transformer get() {
               return new MyTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transform(new MyTransformerSupplier());
       

      With either strategy, within the Transformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The Transformer must return a KeyValue type in transform(). The return value of Transformer#transform() may be null, in which case no record is emitted.

      
       class MyTransformer implements Transformer {
           private ProcessorContext context;
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.context = context;
               this.state = context.getStateStore("myTransformState");
               // punctuate each second; can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           KeyValue transform(K key, V value) {
               // can access this.state
               return new KeyValue(key, value); // can emit a single value via return -- can also be null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transform().

      Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transformValues() )

      Note that it is possible to emit multiple records for each input record by using context#forward() in Transformer#transform() and Punctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time, context#forward() should not be used in Transformer#transform() and Punctuator#punctuate(). If in Transformer#transform() multiple records need to be emitted for each input record, it is recommended to use flatTransform(). The supplier should always generate a new instance each time TransformerSupplier.get() gets called. Creating a single Transformer object and returning the same object reference in TransformerSupplier.get() would be a violation of the supplier pattern and leads to runtime exceptions.

      Type Parameters:
      K1 - the key type of the new stream
      V1 - the value type of the new stream
      Parameters:
      transformerSupplier - an instance of TransformerSupplier that generates a newly constructed Transformer
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • transform

      <K1, V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, Named named, String... stateStoreNames)
      Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and returns zero or one output record. Thus, an input record <K,V> can be transformed into an output record <K':V'>. Attaching a state store makes this a stateful record-by-record operation (cf. map()). If you choose not to attach one, this operation is similar to the stateless map() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator#punctuate(), the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transform(new TransformerSupplier() {
           public Transformer get() {
               return new MyTransformer();
           }
       }, "myTransformState");
       
      The second strategy is for the given TransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyTransformerSupplier implements TransformerSupplier {
           // supply transformer
           Transformer get() {
               return new MyTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transform(new MyTransformerSupplier());
       

      With either strategy, within the Transformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The Transformer must return a KeyValue type in transform(). The return value of Transformer#transform() may be null, in which case no record is emitted.

      
       class MyTransformer implements Transformer {
           private ProcessorContext context;
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.context = context;
               this.state = context.getStateStore("myTransformState");
               // punctuate each second; can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           KeyValue transform(K key, V value) {
               // can access this.state
               return new KeyValue(key, value); // can emit a single value via return -- can also be null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transform().

      Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transformValues() )

      Note that it is possible to emit multiple records for each input record by using context#forward() in Transformer#transform() and Punctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time, context#forward() should not be used in Transformer#transform() and Punctuator#punctuate(). If in Transformer#transform() multiple records need to be emitted for each input record, it is recommended to use flatTransform(). The supplier should always generate a new instance each time TransformerSupplier.get() gets called. Creating a single Transformer object and returning the same object reference in TransformerSupplier.get() would be a violation of the supplier pattern and leads to runtime exceptions.

      Type Parameters:
      K1 - the key type of the new stream
      V1 - the value type of the new stream
      Parameters:
      transformerSupplier - an instance of TransformerSupplier that generates a newly constructed Transformer
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • flatTransform

      <K1, V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, String... stateStoreNames)
      Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and returns zero or more output records. Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. flatMap()). If you choose not to attach one, this operation is similar to the stateless flatMap() but allows access to the ProcessorContext and record metadata. Furthermore, via Punctuator#punctuate() the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transform(new TransformerSupplier() {
           public Transformer get() {
               return new MyTransformer();
           }
       }, "myTransformState");
       
      The second strategy is for the given TransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyTransformerSupplier implements TransformerSupplier {
           // supply transformer
           Transformer get() {
               return new MyTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());
       

      With either strategy, within the Transformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The Transformer must return an Iterable type (e.g., any Collection type) in transform(). The return value of Transformer#transform() may be null, which is equal to returning an empty Iterable, i.e., no records are emitted.

      
       class MyTransformer implements Transformer {
           private ProcessorContext context;
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.context = context;
               this.state = context.getStateStore("myTransformState");
               // punctuate each second; can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<KeyValue> transform(K key, V value) {
               // can access this.state
               List<KeyValue> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new KeyValue(key, value));
               }
               return result; // emits a list of key-value pairs via return
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransform().

      Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transformValues())

      Note that it is possible to emit records by using context#forward() in Transformer#transform() and Punctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time, context#forward() should not be used in Transformer#transform() and Punctuator#punctuate(). The supplier should always generate a new instance each time TransformerSupplier.get() gets called. Creating a single Transformer object and returning the same object reference in TransformerSupplier.get() would be a violation of the supplier pattern and leads to runtime exceptions.

      Type Parameters:
      K1 - the key type of the new stream
      V1 - the value type of the new stream
      Parameters:
      transformerSupplier - an instance of TransformerSupplier that generates a newly constructed Transformer
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • flatTransform

      <K1, V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, Named named, String... stateStoreNames)
      Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). A Transformer (provided by the given TransformerSupplier) is applied to each input record and returns zero or more output records. Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. flatMap()). If you choose not to attach one, this operation is similar to the stateless flatMap() but allows access to the ProcessorContext and record metadata. Furthermore, via Punctuator#punctuate() the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transform(new TransformerSupplier() {
           public Transformer get() {
               return new MyTransformer();
           }
       }, "myTransformState");
       
      The second strategy is for the given TransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyTransformerSupplier implements TransformerSupplier {
           // supply transformer
           Transformer get() {
               return new MyTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());
       

      With either strategy, within the Transformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The Transformer must return an Iterable type (e.g., any Collection type) in transform(). The return value of Transformer#transform() may be null, which is equal to returning an empty Iterable, i.e., no records are emitted.

      
       class MyTransformer implements Transformer {
           private ProcessorContext context;
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.context = context;
               this.state = context.getStateStore("myTransformState");
               // punctuate each second; can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<KeyValue> transform(K key, V value) {
               // can access this.state
               List<KeyValue> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new KeyValue(key, value));
               }
               return result; // emits a list of key-value pairs via return
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransform().

      Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transformValues())

      Note that it is possible to emit records by using context#forward() in Transformer#transform() and Punctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time, context#forward() should not be used in Transformer#transform() and Punctuator#punctuate(). The supplier should always generate a new instance each time TransformerSupplier.get() gets called. Creating a single Transformer object and returning the same object reference in TransformerSupplier.get() would be a violation of the supplier pattern and leads to runtime exceptions.

      Type Parameters:
      K1 - the key type of the new stream
      V1 - the value type of the new stream
      Parameters:
      transformerSupplier - an instance of TransformerSupplier that generates a newly constructed Transformer
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with new key and value (possibly of different type)
      See Also:
    • transformValues

      <VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, String... stateStoreNames)
      Transform the value of each input record into a new value (with possibly a new type) of the output record. A ValueTransformer (provided by the given ValueTransformerSupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
           public ValueTransformer get() {
               return new MyValueTransformer();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerSupplier implements ValueTransformerSupplier {
           // supply transformer
           ValueTransformer get() {
               return new MyValueTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
       

      With either strategy, within the ValueTransformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformer must return the new value in transform(). In contrast to transform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformer tries to emit a KeyValue pair.

      
       class MyValueTransformer implements ValueTransformer {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           NewValueType transform(V value) {
               // can access this.state
               return new NewValueType(); // or null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transformValues().

      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transform(TransformerSupplier, String...))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerSupplier that generates a newly constructed ValueTransformer The supplier should always generate a new instance. Creating a single ValueTransformer object and returning the same object reference in ValueTransformer is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames)
      Transform the value of each input record into a new value (with possibly a new type) of the output record. A ValueTransformer (provided by the given ValueTransformerSupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
           public ValueTransformer get() {
               return new MyValueTransformer();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerSupplier implements ValueTransformerSupplier {
           // supply transformer
           ValueTransformer get() {
               return new MyValueTransformer();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
       

      With either strategy, within the ValueTransformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformer must return the new value in transform(). In contrast to transform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformer tries to emit a KeyValue pair.

      
       class MyValueTransformer implements ValueTransformer {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           NewValueType transform(V value) {
               // can access this.state
               return new NewValueType(); // or null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transformValues().

      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transform(TransformerSupplier, String...))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerSupplier that generates a newly constructed ValueTransformer The supplier should always generate a new instance. Creating a single ValueTransformer object and returning the same object reference in ValueTransformer is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier, String... stateStoreNames)
      Transform the value of each input record into a new value (with possibly a new type) of the output record. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
           public ValueTransformer get() {
               return new MyValueTransformer();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerWithKeySupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
       

      With either strategy, within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformerWithKey must return the new value in transform(). In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformerWithKey tries to emit a KeyValue pair.

      
       class MyValueTransformerWithKey implements ValueTransformerWithKey {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           NewValueType transform(K readOnlyKey, V value) {
               // can access this.state and use read-only key
               return new NewValueType(readOnlyKey); // or null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transformValues().

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transform(TransformerSupplier, String...))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerWithKeySupplier that generates a newly constructed ValueTransformerWithKey The supplier should always generate a new instance. Creating a single ValueTransformerWithKey object and returning the same object reference in ValueTransformerWithKey is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames)
      Transform the value of each input record into a new value (with possibly a new type) of the output record. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
           public ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerWithKeySupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
       

      With either strategy, within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformerWithKey must return the new value in transform(). In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformerWithKey tries to emit a KeyValue pair.

      
       class MyValueTransformerWithKey implements ValueTransformerWithKey {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           NewValueType transform(K readOnlyKey, V value) {
               // can access this.state and use read-only key
               return new NewValueType(readOnlyKey); // or null
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before transformValues().

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. transform(TransformerSupplier, String...))

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerWithKeySupplier that generates a newly constructed ValueTransformerWithKey The supplier should always generate a new instance. Creating a single ValueTransformerWithKey object and returning the same object reference in ValueTransformerWithKey is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • flatTransformValues

      <VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier, String... stateStoreNames)
      Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. A ValueTransformer (provided by the given ValueTransformerSupplier) is applied to each input record value and computes zero or more new values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator#punctuate() the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
           public ValueTransformer get() {
               return new MyValueTransformer();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerSupplier implements ValueTransformerSupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
       

      With either strategy, within the ValueTransformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformer must return an Iterable type (e.g., any Collection type) in transform(). If the return value of ValueTransformer#transform() is an empty Iterable or null, no records are emitted. In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformer tries to emit a KeyValue pair.

      
       class MyValueTransformer implements ValueTransformer {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<NewValueType> transform(V value) {
               // can access this.state
               List<NewValueType> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new NewValueType(value));
               }
               return result; // values
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransformValues().

      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatTransform())

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerSupplier that generates a newly constructed ValueTransformer The supplier should always generate a new instance. Creating a single ValueTransformer object and returning the same object reference in ValueTransformer is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with unmodified key and new values (possibly of different type)
      See Also:
    • flatTransformValues

      <VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames)
      Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. A ValueTransformer (provided by the given ValueTransformerSupplier) is applied to each input record value and computes zero or more new values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()). If you choose not to attach one, this operation is similar to the stateless mapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator#punctuate() the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
           public ValueTransformer get() {
               return new MyValueTransformer();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerSupplier implements ValueTransformerSupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
       

      With either strategy, within the ValueTransformer, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformer must return an Iterable type (e.g., any Collection type) in transform(). If the return value of ValueTransformer#transform() is an empty Iterable or null, no records are emitted. In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformer tries to emit a KeyValue pair.

      
       class MyValueTransformer implements ValueTransformer {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<NewValueType> transform(V value) {
               // can access this.state
               List<NewValueType> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new NewValueType(value));
               }
               return result; // values
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransformValues().

      Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatTransform())

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerSupplier that generates a newly constructed ValueTransformer The supplier should always generate a new instance. Creating a single ValueTransformer object and returning the same object reference in ValueTransformer is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with unmodified key and new values (possibly of different type)
      See Also:
    • flatTransformValues

      <VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier, String... stateStoreNames)
      Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes zero or more new values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. flatMapValues()). If you choose not to attach one, this operation is similar to the stateless flatMapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
           public ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
       

      With either strategy, within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformerWithKey must return an Iterable type (e.g., any Collection type) in transform(). If the return value of ValueTransformerWithKey#transform() is an empty Iterable or null, no records are emitted. In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformerWithKey tries to emit a KeyValue pair.

      
       class MyValueTransformerWithKey implements ValueTransformerWithKey {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<NewValueType> transform(K readOnlyKey, V value) {
               // can access this.state and use read-only key
               List<NewValueType> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new NewValueType(readOnlyKey));
               }
               return result; // values
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransformValues().

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatTransform())

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerWithKeySupplier that generates a newly constructed ValueTransformerWithKey The supplier should always generate a new instance. Creating a single ValueTransformerWithKey object and returning the same object reference in ValueTransformerWithKey is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with unmodified key and new values (possibly of different type)
      See Also:
    • flatTransformValues

      <VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames)
      Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes zero or more new values. Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf. flatMapValues()). If you choose not to attach one, this operation is similar to the stateless flatMapValues() but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the transformer.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
           public ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
       }, "myValueTransformState");
       
      The second strategy is for the given ValueTransformerSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the transformer.
      
       class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
           // supply transformer
           ValueTransformerWithKey get() {
               return new MyValueTransformerWithKey();
           }
      
           // provide store(s) that will be added and connected to the associated transformer
           // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
       

      With either strategy, within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered. The ValueTransformerWithKey must return an Iterable type (e.g., any Collection type) in transform(). If the return value of ValueTransformerWithKey#transform() is an empty Iterable or null, no records are emitted. In contrast to transform() and flatTransform(), no additional KeyValue pairs can be emitted via ProcessorContext.forward(). A StreamsException is thrown if the ValueTransformerWithKey tries to emit a KeyValue pair.

      
       class MyValueTransformerWithKey implements ValueTransformerWithKey {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myValueTransformState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           Iterable<NewValueType> transform(K readOnlyKey, V value) {
               // can access this.state and use read-only key
               List<NewValueType> result = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   result.add(new NewValueType(readOnlyKey));
               }
               return result; // values
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before flatTransformValues().

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result KStream. (cf. flatTransform())

      Type Parameters:
      VR - the value type of the result stream
      Parameters:
      valueTransformerSupplier - an instance of ValueTransformerWithKeySupplier that generates a newly constructed ValueTransformerWithKey The supplier should always generate a new instance. Creating a single ValueTransformerWithKey object and returning the same object reference in ValueTransformerWithKey is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      Returns:
      a KStream that contains more or less records with unmodified key and new values (possibly of different type)
      See Also:
    • process

      @Deprecated void process(ProcessorSupplier<? super K,? super V> processorSupplier, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a Processor (provided by the given ProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction) but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.

      In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the processor.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.processor(new ProcessorSupplier() {
           public Processor get() {
               return new MyProcessor();
           }
       }, "myProcessorState");
       
      The second strategy is for the given ProcessorSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the processor.
      
       class MyProcessorSupplier implements ProcessorSupplier {
           // supply processor
           Processor get() {
               return new MyProcessor();
           }
      
           // provide store(s) that will be added and connected to the associated processor
           // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.process(new MyProcessorSupplier());
       

      With either strategy, within the Processor, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       class MyProcessor implements Processor {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myProcessorState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           void process(K key, V value) {
               // can access this.state
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before process().
      Parameters:
      processorSupplier - an instance of ProcessorSupplier that generates a newly constructed Processor The supplier should always generate a new instance. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      See Also:
    • process

      void process(ProcessorSupplier<? super K,? super V,Void,Void> processorSupplier, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a Processor (provided by the given ProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction) but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.

      In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the processor.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.processor(new ProcessorSupplier() {
           public Processor get() {
               return new MyProcessor();
           }
       }, "myProcessorState");
       
      The second strategy is for the given ProcessorSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the processor.
      
       class MyProcessorSupplier implements ProcessorSupplier {
           // supply processor
           Processor get() {
               return new MyProcessor();
           }
      
           // provide store(s) that will be added and connected to the associated processor
           // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.process(new MyProcessorSupplier());
       

      With either strategy, within the Processor, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       class MyProcessor implements Processor {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myProcessorState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           void process(K key, V value) {
               // can access this.state
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before process().
      Parameters:
      processorSupplier - an instance of ProcessorSupplier that generates a newly constructed Processor The supplier should always generate a new instance. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() is a violation of the supplier pattern and leads to runtime exceptions.
      stateStoreNames - the names of the state stores used by the processor; not required if the supplier implements ConnectedStoreProvider.stores()
      See Also:
    • process

      @Deprecated void process(ProcessorSupplier<? super K,? super V> processorSupplier, Named named, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a Processor (provided by the given ProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction) but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.

      In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the processor.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.processor(new ProcessorSupplier() {
           public Processor get() {
               return new MyProcessor();
           }
       }, "myProcessorState");
       
      The second strategy is for the given ProcessorSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the processor.
      
       class MyProcessorSupplier implements ProcessorSupplier {
           // supply processor
           Processor get() {
               return new MyProcessor();
           }
      
           // provide store(s) that will be added and connected to the associated processor
           // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.process(new MyProcessorSupplier());
       

      With either strategy, within the Processor, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       class MyProcessor implements Processor {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myProcessorState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           void process(K key, V value) {
               // can access this.state
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before process().
      Parameters:
      processorSupplier - an instance of ProcessorSupplier that generates a newly constructed Processor The supplier should always generate a new instance. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state store used by the processor
      See Also:
    • process

      void process(ProcessorSupplier<? super K,? super V,Void,Void> processorSupplier, Named named, String... stateStoreNames)
      Process all records in this stream, one record at a time, by applying a Processor (provided by the given ProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction) but allows access to the ProcessorContext and record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.

      In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).

      The first strategy is to manually add the StoreBuilders via Topology.addStateStore(StoreBuilder, String...), and specify the store names via stateStoreNames so they will be connected to the processor.

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                       Serdes.String(),
                       Serdes.String());
       // add store
       builder.addStateStore(keyValueStoreBuilder);
      
       KStream outputStream = inputStream.processor(new ProcessorSupplier() {
           public Processor get() {
               return new MyProcessor();
           }
       }, "myProcessorState");
       
      The second strategy is for the given ProcessorSupplier to implement ConnectedStoreProvider.stores(), which provides the StoreBuilders to be automatically added to the topology and connected to the processor.
      
       class MyProcessorSupplier implements ProcessorSupplier {
           // supply processor
           Processor get() {
               return new MyProcessor();
           }
      
           // provide store(s) that will be added and connected to the associated processor
           // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
           Set<StoreBuilder> stores() {
               StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
                         Serdes.String(),
                         Serdes.String());
               return Collections.singleton(keyValueStoreBuilder);
           }
       }
      
       ...
      
       KStream outputStream = inputStream.process(new MyProcessorSupplier());
       

      With either strategy, within the Processor, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       class MyProcessor implements Processor {
           private StateStore state;
      
           void init(ProcessorContext context) {
               this.state = context.getStateStore("myProcessorState");
               // punctuate each second, can access this.state
               context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
           }
      
           void process(K key, V value) {
               // can access this.state
           }
      
           void close() {
               // can access this.state
           }
       }
       
      Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call to repartition() should be performed before process().
      Parameters:
      processorSupplier - an instance of ProcessorSupplier that generates a newly constructed Processor The supplier should always generate a new instance. Creating a single Processor object and returning the same object reference in ProcessorSupplier.get() is a violation of the supplier pattern and leads to runtime exceptions.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state store used by the processor
      See Also: