K - Type of keysV - Type of values@InterfaceStability.Unstable
public interface KStream<K,V>
KStream is an abstraction of a record stream of key-value pairs.
 
 A KStream is either defined from one or multiple Kafka topics that are consumed message by message or
 the result of a KStream transformation. A KTable can also be converted into a KStream.
 
 A KStream can be transformed record by record, joined with another KStream or KTable, or
 can be aggregated into a KTable.
KTable| Modifier and Type | Method and Description | 
|---|---|
| KStream<K,V>[] | branch(Predicate<K,V>... predicates)Creates an array of  KStreamfrom this stream by branching the elements in the original stream based on the supplied predicates. | 
| KStream<K,V> | filter(Predicate<K,V> predicate)Create a new instance of  KStreamthat consists of all elements of this stream which satisfy a predicate. | 
| KStream<K,V> | filterNot(Predicate<K,V> predicate)Create a new instance of  KStreamthat consists all elements of this stream which do not satisfy a predicate. | 
| <K1,V1> KStream<K1,V1> | flatMap(KeyValueMapper<K,V,Iterable<KeyValue<K1,V1>>> mapper)Create a new instance of  KStreamby transforming each element in this stream into zero or more elements in the new stream. | 
| <V1> KStream<K,V1> | flatMapValues(ValueMapper<V,Iterable<V1>> processor)Create a new instance of  KStreamby transforming the value of each element in this stream into zero or more values with the same key in the new stream. | 
| void | foreach(ForeachAction<K,V> action)Perform an action on each element of  KStream. | 
| <K1> KGroupedStream<K1,V> | groupBy(KeyValueMapper<K,V,K1> selector)Group the records of this  KStreamusing the providedKeyValueMapperand
 default serializers and deserializers. | 
| <K1> KGroupedStream<K1,V> | groupBy(KeyValueMapper<K,V,K1> selector,
       Serde<K1> keySerde,
       Serde<V> valSerde)Group the records of this  KStreamusing the providedKeyValueMapper. | 
| KGroupedStream<K,V> | groupByKey()Group the records with the same key into a  KGroupedStreamwhile preserving the
 original values. | 
| KGroupedStream<K,V> | groupByKey(Serde<K> keySerde,
          Serde<V> valSerde)Group the records with the same key into a  KGroupedStreamwhile preserving the
 original values. | 
| <V1,R> KStream<K,R> | join(KStream<K,V1> otherStream,
    ValueJoiner<V,V1,R> joiner,
    JoinWindows windows)Combine element values of this stream with another  KStream's elements of the same key using windowed Inner Join
 with default serializers and deserializers. | 
| <V1,R> KStream<K,R> | join(KStream<K,V1> otherStream,
    ValueJoiner<V,V1,R> joiner,
    JoinWindows windows,
    Serde<K> keySerde,
    Serde<V> thisValueSerde,
    Serde<V1> otherValueSerde)Combine element values of this stream with another  KStream's elements of the same key using windowed Inner Join. | 
| <V1,R> KStream<K,R> | leftJoin(KStream<K,V1> otherStream,
        ValueJoiner<V,V1,R> joiner,
        JoinWindows windows)Combine values of this stream with another  KStream's elements of the same key using windowed Left Join
 with default serializers and deserializers. | 
| <V1,R> KStream<K,R> | leftJoin(KStream<K,V1> otherStream,
        ValueJoiner<V,V1,R> joiner,
        JoinWindows windows,
        Serde<K> keySerde,
        Serde<V> thisValSerde,
        Serde<V1> otherValueSerde)Combine values of this stream with another  KStream's elements of the same key using windowed Left Join. | 
| <V1,V2> KStream<K,V2> | leftJoin(KTable<K,V1> table,
        ValueJoiner<V,V1,V2> joiner)Combine values of this stream with  KTable's elements of the same key using non-windowed Left Join. | 
| <V1,V2> KStream<K,V2> | leftJoin(KTable<K,V1> table,
        ValueJoiner<V,V1,V2> valueJoiner,
        Serde<K> keySerde,
        Serde<V> valSerde)Combine values of this stream with  KTable's elements of the same key using non-windowed Left Join. | 
| <K1,V1> KStream<K1,V1> | map(KeyValueMapper<K,V,KeyValue<K1,V1>> mapper)Create a new instance of  KStreamby transforming each element in this stream into a different element in the new stream. | 
| <V1> KStream<K,V1> | mapValues(ValueMapper<V,V1> mapper)Create a new instance of  KStreamby transforming the value of each element in this stream into a new value in the new stream. | 
| <V1,R> KStream<K,R> | outerJoin(KStream<K,V1> otherStream,
         ValueJoiner<V,V1,R> joiner,
         JoinWindows windows)Combine values of this stream with another  KStream's elements of the same key using windowed Outer Join
 with default serializers and deserializers. | 
| <V1,R> KStream<K,R> | outerJoin(KStream<K,V1> otherStream,
         ValueJoiner<V,V1,R> joiner,
         JoinWindows windows,
         Serde<K> keySerde,
         Serde<V> thisValueSerde,
         Serde<V1> otherValueSerde)Combine values of this stream with another  KStream's elements of the same key using windowed Outer Join. | 
| void | print()Print the elements of this stream to  System.out. | 
| void | print(Serde<K> keySerde,
     Serde<V> valSerde)Print the elements of this stream to System.out. | 
| void | print(Serde<K> keySerde,
     Serde<V> valSerde,
     String streamName)Print the elements of this stream to System.out | 
| void | print(String streamName)Print the elements of this stream to  System.out. | 
| void | process(ProcessorSupplier<K,V> processorSupplier,
       String... stateStoreNames)Process all elements in this stream, one element at a time, by applying a  Processor. | 
| <K1> KStream<K1,V> | selectKey(KeyValueMapper<K,V,K1> mapper)Create a new key from the current key and value. | 
| KStream<K,V> | through(Serde<K> keySerde,
       Serde<V> valSerde,
       StreamPartitioner<K,V> partitioner,
       String topic)Materialize this stream to a topic, also creates a new instance of  KStreamfrom the topic
 using a customizableStreamPartitionerto determine the distribution of records to partitions. | 
| KStream<K,V> | through(Serde<K> keySerde,
       Serde<V> valSerde,
       String topic)Materialize this stream to a topic, also creates a new instance of  KStreamfrom the topic. | 
| KStream<K,V> | through(StreamPartitioner<K,V> partitioner,
       String topic)Materialize this stream to a topic, also creates a new instance of  KStreamfrom the topic
 using default serializers and deserializers and a customizableStreamPartitionerto determine the distribution of records to partitions. | 
| KStream<K,V> | through(String topic)Materialize this stream to a topic, also creates a new instance of  KStreamfrom the topic
 using default serializers and deserializers and producer'sDefaultPartitioner. | 
| void | to(Serde<K> keySerde,
  Serde<V> valSerde,
  StreamPartitioner<K,V> partitioner,
  String topic)Materialize this stream to a topic using a customizable  StreamPartitionerto determine the distribution of records to partitions. | 
| void | to(Serde<K> keySerde,
  Serde<V> valSerde,
  String topic)Materialize this stream to a topic. | 
| void | to(StreamPartitioner<K,V> partitioner,
  String topic)Materialize this stream to a topic using default serializers specified in the config and a customizable
  StreamPartitionerto determine the distribution of records to partitions. | 
| void | to(String topic)Materialize this stream to a topic using default serializers specified in the config
 and producer's  DefaultPartitioner. | 
| <K1,V1> KStream<K1,V1> | transform(TransformerSupplier<K,V,KeyValue<K1,V1>> transformerSupplier,
         String... stateStoreNames)Create a new  KStreaminstance by applying aTransformerto all elements in this stream, one element at a time. | 
| <R> KStream<K,R> | transformValues(ValueTransformerSupplier<V,R> valueTransformerSupplier,
               String... stateStoreNames)Create a new  KStreaminstance by applying aValueTransformerto all values in this stream, one element at a time. | 
| void | writeAsText(String filePath)Write the elements of this stream to a file at the given path. | 
| void | writeAsText(String filePath,
           Serde<K> keySerde,
           Serde<V> valSerde) | 
| void | writeAsText(String filePath,
           String streamName)Write the elements of this stream to a file at the given path. | 
| void | writeAsText(String filePath,
           String streamName,
           Serde<K> keySerde,
           Serde<V> valSerde) | 
KStream<K,V> filter(Predicate<K,V> predicate)
KStream that consists of all elements of this stream which satisfy a predicate.KStream<K,V> filterNot(Predicate<K,V> predicate)
KStream that consists all elements of this stream which do not satisfy a predicate.<K1> KStream<K1,V> selectKey(KeyValueMapper<K,V,K1> mapper)
K1 - the new key type on the streammapper - the instance of KeyValueMapperKStream that contains records with different key type and same value type<K1,V1> KStream<K1,V1> map(KeyValueMapper<K,V,KeyValue<K1,V1>> mapper)
KStream by transforming each element in this stream into a different element in the new stream.K1 - the key type of the new streamV1 - the value type of the new streammapper - the instance of KeyValueMapperKStream that contains records with new key and value type<V1> KStream<K,V1> mapValues(ValueMapper<V,V1> mapper)
KStream by transforming the value of each element in this stream into a new value in the new stream.V1 - the value type of the new streammapper - the instance of ValueMapperKStream that contains records with unmodified keys and new values of different typevoid print()
System.out.  This function
 will use the generated name of the parent processor node to label the key/value pairs
 printed out to the console.
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc to get meaningful information.void print(String streamName)
System.out.  This function
 will use the given name to label the key/value printed out to the console.streamName - the name used to label the key/value pairs printed out to the console
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde)
keySerde - key serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be used
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
keySerde - key serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be usedstreamName - the name used to label the key/value pairs printed out to the console
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc. to get meaningful information.void writeAsText(String filePath)
void writeAsText(String filePath, String streamName)
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
filePath - name of file to write tokeySerde - key serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be used
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc. to get meaningful information.void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde)
filePath - name of file to write tostreamName - the name used to label the key/value pairs printed out to the consolekeySerde - key serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                 if not specified the default serde defined in the configs will be used
 Implementors will need to override toString for keys and values that are not of
 type String, Integer etc. to get meaningful information.<K1,V1> KStream<K1,V1> flatMap(KeyValueMapper<K,V,Iterable<KeyValue<K1,V1>>> mapper)
KStream by transforming each element in this stream into zero or more elements in the new stream.K1 - the key type of the new streamV1 - the value type of the new streammapper - the instance of KeyValueMapperKStream that contains more or less records with new key and value type<V1> KStream<K,V1> flatMapValues(ValueMapper<V,Iterable<V1>> processor)
KStream by transforming the value of each element in this stream into zero or more values with the same key in the new stream.V1 - the value type of the new streamprocessor - the instance of ValueMapperKStream that contains more or less records with unmodified keys and new values of different typeKStream<K,V>[] branch(Predicate<K,V>... predicates)
KStream from this stream by branching the elements in the original stream based on the supplied predicates.
 Each element is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array
 corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: An element
 in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and
 assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.KStream<K,V> through(String topic)
KStream from the topic
 using default serializers and deserializers and producer's DefaultPartitioner.
 This is equivalent to calling to(String) and KStreamBuilder.stream(String...).void foreach(ForeachAction<K,V> action)
KStream.
 Note that this is a terminal operation that returns void.action - an action to perform on each elementKStream<K,V> through(StreamPartitioner<K,V> partitioner, String topic)
KStream from the topic
 using default serializers and deserializers and a customizable StreamPartitioner to determine the distribution of records to partitions.
 This is equivalent to calling to(StreamPartitioner, String) and KStreamBuilder.stream(String...).KStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, String topic)
KStream from the topic.
 If keySerde provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
 for the key org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner is used
 — otherwise producer's DefaultPartitioner is used.
 This is equivalent to calling to(Serde, Serde, String) and
 KStreamBuilder.stream(Serde, Serde, String...).keySerde - key serde used to send key-value pairs,
                     if not specified the default key serde defined in the configuration will be usedvalSerde - value serde used to send key-value pairs,
                     if not specified the default value serde defined in the configuration will be usedtopic - the topic nameKStream that contains the exact same records as this KStreamKStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic)
KStream from the topic
 using a customizable StreamPartitioner to determine the distribution of records to partitions.
 This is equivalent to calling to(Serde, Serde, StreamPartitioner, String) and
 KStreamBuilder.stream(Serde, Serde, String...).keySerde - key serde used to send key-value pairs,
                     if not specified the default key serde defined in the configuration will be usedvalSerde - value serde used to send key-value pairs,
                     if not specified the default value serde defined in the configuration will be usedpartitioner - the function used to determine how records are distributed among partitions of the topic,
                     if not specified and keySerde provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer for the key
                     org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner will be used
                     — otherwise DefaultPartitioner will be usedtopic - the topic nameKStream that contains the exact same records as this KStreamvoid to(String topic)
DefaultPartitioner.topic - the topic namevoid to(StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner to determine the distribution of records to partitions.partitioner - the function used to determine how records are distributed among partitions of the topic,
                     if not specified producer's DefaultPartitioner will be usedtopic - the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, String topic)
keySerde provides a
 org.apache.kafka.streams.kstream.internals.WindowedSerializer for the key
 org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner is used
 — otherwise producer's DefaultPartitioner is used.keySerde - key serde used to send key-value pairs,
                     if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                     if not specified the default serde defined in the configs will be usedtopic - the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner to determine the distribution of records to partitions.keySerde - key serde used to send key-value pairs,
                     if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
                     if not specified the default serde defined in the configs will be usedpartitioner - the function used to determine how records are distributed among partitions of the topic,
                     if not specified and keySerde provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer for the key
                     org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner will be used
                     — otherwise DefaultPartitioner will be usedtopic - the topic name<K1,V1> KStream<K1,V1> transform(TransformerSupplier<K,V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
KStream instance by applying a Transformer to all elements in this stream, one element at a time.transformerSupplier - the instance of TransformerSupplier that generates TransformerstateStoreNames - the names of the state store used by the processorKStream with transformed key and value types<R> KStream<K,R> transformValues(ValueTransformerSupplier<V,R> valueTransformerSupplier, String... stateStoreNames)
KStream instance by applying a ValueTransformer to all values in this stream, one element at a time.valueTransformerSupplier - the instance of ValueTransformerSupplier that generates ValueTransformerstateStoreNames - the names of the state store used by the processorKStream that contains records with unmodified keys and transformed values with type Rvoid process(ProcessorSupplier<K,V> processorSupplier, String... stateStoreNames)
Processor.processorSupplier - the supplier of ProcessorSupplier that generates ProcessorstateStoreNames - the names of the state store used by the processor<V1,R> KStream<K,R> join(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde)
KStream's elements of the same key using windowed Inner Join.
 If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowskeySerde - key serdes for materializing both streams,
                          if not specified the default serdes defined in the configs will be usedthisValueSerde - value serdes for materializing this stream,
                          if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
                          if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> join(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream's elements of the same key using windowed Inner Join
 with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> outerJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde)
KStream's elements of the same key using windowed Outer Join.
 If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with an auto-generated
 store name.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowskeySerde - key serdes for materializing both streams,
                          if not specified the default serdes defined in the configs will be usedthisValueSerde - value serdes for materializing this stream,
                          if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
                          if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> outerJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream's elements of the same key using windowed Outer Join
 with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with auto-generated
 store names.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> leftJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValSerde, Serde<V1> otherValueSerde)
KStream's elements of the same key using windowed Left Join.
 If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with auto-generated
 store names.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowskeySerde - key serdes for materializing the other stream,
                          if not specified the default serdes defined in the configs will be usedthisValSerde - value serdes for materializing this stream,
                          if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
                          if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> leftJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream's elements of the same key using windowed Left Join
 with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
 Both of the joining KStreams will be materialized in local state stores with auto-generated
 store names.
 Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
 in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
 StreamsConfig.V1 - the value type of the other streamR - the value type of the new streamotherStream - the instance of KStream joined with this streamjoiner - the instance of ValueJoinerwindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<V1,V2> KStream<K,V2> leftJoin(KTable<K,V1> table, ValueJoiner<V,V1,V2> joiner)
KTable's elements of the same key using non-windowed Left Join.
 If a record key is null it will not included in the resulting KStreamV1 - the value type of the tableV2 - the value type of the new streamtable - the instance of KTable joined with this streamjoiner - the instance of ValueJoinerKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key<V1,V2> KStream<K,V2> leftJoin(KTable<K,V1> table, ValueJoiner<V,V1,V2> valueJoiner, Serde<K> keySerde, Serde<V> valSerde)
KTable's elements of the same key using non-windowed Left Join.
 If a record key is null it will not included in the resulting KStreamV1 - the value type of the tableV2 - the value type of the new streamtable - the instance of KTable joined with this streamvalueJoiner - the instance of ValueJoinerkeySerde - key serdes for materializing this stream.
                      If not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
                      if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given ValueJoiner,
         one for each matched record-pair with the same key and within the joining window intervals<K1> KGroupedStream<K1,V> groupBy(KeyValueMapper<K,V,K1> selector)
KStream using the provided KeyValueMapper and
 default serializers and deserializers. If a record key is null it will not included in
 the resulting KGroupedStreamK1 - the key type of the KGroupedStreamselector - select the grouping key and value to be aggregatedKGroupedStream that contains the grouped records of the original KStream<K1> KGroupedStream<K1,V> groupBy(KeyValueMapper<K,V,K1> selector, Serde<K1> keySerde, Serde<V> valSerde)
KStream using the provided KeyValueMapper.
 If a record key is null it will not included in the resulting KGroupedStreamK1 - the key type of the KGroupedStreamselector - select the grouping key and value to be aggregatedkeySerde - key serdes for materializing this stream,
                      if not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
                      if not specified the default serdes defined in the configs will be usedKGroupedStream that contains the grouped records of the original KStreamKGroupedStream<K,V> groupByKey()
KGroupedStream while preserving the
 original values. If a record key is null it will not included in the resulting
 KGroupedStream
 Default Serdes will be usedKGroupedStreamKGroupedStream<K,V> groupByKey(Serde<K> keySerde, Serde<V> valSerde)
KGroupedStream while preserving the
 original values. If a record key is null it will not included in the resulting
 KGroupedStreamkeySerde - key serdes for materializing this stream,
                      if not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
                      if not specified the default serdes defined in the configs will be usedKGroupedStream