K
- Type of keysV
- Type of values@InterfaceStability.Unstable
public interface KStream<K,V>
KStream
is an abstraction of a record stream of key-value pairs.
A KStream
is either defined from one or multiple Kafka topics that are consumed message by message or
the result of a KStream
transformation. A KTable
can also be converted into a KStream
.
A KStream
can be transformed record by record, joined with another KStream
or KTable
, or
can be aggregated into a KTable
.
KTable
Modifier and Type | Method and Description |
---|---|
KStream<K,V>[] |
branch(Predicate<K,V>... predicates)
Creates an array of
KStream from this stream by branching the elements in the original stream based on the supplied predicates. |
KStream<K,V> |
filter(Predicate<K,V> predicate)
Create a new instance of
KStream that consists of all elements of this stream which satisfy a predicate. |
KStream<K,V> |
filterNot(Predicate<K,V> predicate)
Create a new instance of
KStream that consists all elements of this stream which do not satisfy a predicate. |
<K1,V1> KStream<K1,V1> |
flatMap(KeyValueMapper<K,V,Iterable<KeyValue<K1,V1>>> mapper)
Create a new instance of
KStream by transforming each element in this stream into zero or more elements in the new stream. |
<V1> KStream<K,V1> |
flatMapValues(ValueMapper<V,Iterable<V1>> processor)
Create a new instance of
KStream by transforming the value of each element in this stream into zero or more values with the same key in the new stream. |
void |
foreach(ForeachAction<K,V> action)
Perform an action on each element of
KStream . |
<K1> KGroupedStream<K1,V> |
groupBy(KeyValueMapper<K,V,K1> selector)
Group the records of this
KStream using the provided KeyValueMapper and
default serializers and deserializers. |
<K1> KGroupedStream<K1,V> |
groupBy(KeyValueMapper<K,V,K1> selector,
Serde<K1> keySerde,
Serde<V> valSerde)
Group the records of this
KStream using the provided KeyValueMapper . |
KGroupedStream<K,V> |
groupByKey()
Group the records with the same key into a
KGroupedStream while preserving the
original values. |
KGroupedStream<K,V> |
groupByKey(Serde<K> keySerde,
Serde<V> valSerde)
Group the records with the same key into a
KGroupedStream while preserving the
original values. |
<V1,R> KStream<K,R> |
join(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows)
Combine element values of this stream with another
KStream 's elements of the same key using windowed Inner Join
with default serializers and deserializers. |
<V1,R> KStream<K,R> |
join(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde)
Combine element values of this stream with another
KStream 's elements of the same key using windowed Inner Join. |
<V1,R> KStream<K,R> |
leftJoin(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows)
Combine values of this stream with another
KStream 's elements of the same key using windowed Left Join
with default serializers and deserializers. |
<V1,R> KStream<K,R> |
leftJoin(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValSerde,
Serde<V1> otherValueSerde)
Combine values of this stream with another
KStream 's elements of the same key using windowed Left Join. |
<V1,V2> KStream<K,V2> |
leftJoin(KTable<K,V1> table,
ValueJoiner<V,V1,V2> joiner)
Combine values of this stream with
KTable 's elements of the same key using non-windowed Left Join. |
<V1,V2> KStream<K,V2> |
leftJoin(KTable<K,V1> table,
ValueJoiner<V,V1,V2> valueJoiner,
Serde<K> keySerde,
Serde<V> valSerde)
Combine values of this stream with
KTable 's elements of the same key using non-windowed Left Join. |
<K1,V1> KStream<K1,V1> |
map(KeyValueMapper<K,V,KeyValue<K1,V1>> mapper)
Create a new instance of
KStream by transforming each element in this stream into a different element in the new stream. |
<V1> KStream<K,V1> |
mapValues(ValueMapper<V,V1> mapper)
Create a new instance of
KStream by transforming the value of each element in this stream into a new value in the new stream. |
<V1,R> KStream<K,R> |
outerJoin(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows)
Combine values of this stream with another
KStream 's elements of the same key using windowed Outer Join
with default serializers and deserializers. |
<V1,R> KStream<K,R> |
outerJoin(KStream<K,V1> otherStream,
ValueJoiner<V,V1,R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde)
Combine values of this stream with another
KStream 's elements of the same key using windowed Outer Join. |
void |
print()
Print the elements of this stream to
System.out . |
void |
print(Serde<K> keySerde,
Serde<V> valSerde)
Print the elements of this stream to System.out.
|
void |
print(Serde<K> keySerde,
Serde<V> valSerde,
String streamName)
Print the elements of this stream to System.out
|
void |
print(String streamName)
Print the elements of this stream to
System.out . |
void |
process(ProcessorSupplier<K,V> processorSupplier,
String... stateStoreNames)
Process all elements in this stream, one element at a time, by applying a
Processor . |
<K1> KStream<K1,V> |
selectKey(KeyValueMapper<K,V,K1> mapper)
Create a new key from the current key and value.
|
KStream<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic, also creates a new instance of
KStream from the topic
using a customizable StreamPartitioner to determine the distribution of records to partitions. |
KStream<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this stream to a topic, also creates a new instance of
KStream from the topic. |
KStream<K,V> |
through(StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic, also creates a new instance of
KStream from the topic
using default serializers and deserializers and a customizable StreamPartitioner to determine the distribution of records to partitions. |
KStream<K,V> |
through(String topic)
Materialize this stream to a topic, also creates a new instance of
KStream from the topic
using default serializers and deserializers and producer's DefaultPartitioner . |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic using a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this stream to a topic.
|
void |
to(StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic using default serializers specified in the config and a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(String topic)
Materialize this stream to a topic using default serializers specified in the config
and producer's
DefaultPartitioner . |
<K1,V1> KStream<K1,V1> |
transform(TransformerSupplier<K,V,KeyValue<K1,V1>> transformerSupplier,
String... stateStoreNames)
Create a new
KStream instance by applying a Transformer to all elements in this stream, one element at a time. |
<R> KStream<K,R> |
transformValues(ValueTransformerSupplier<V,R> valueTransformerSupplier,
String... stateStoreNames)
Create a new
KStream instance by applying a ValueTransformer to all values in this stream, one element at a time. |
void |
writeAsText(String filePath)
Write the elements of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
Serde<K> keySerde,
Serde<V> valSerde) |
void |
writeAsText(String filePath,
String streamName)
Write the elements of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
String streamName,
Serde<K> keySerde,
Serde<V> valSerde) |
KStream<K,V> filter(Predicate<K,V> predicate)
KStream
that consists of all elements of this stream which satisfy a predicate.KStream<K,V> filterNot(Predicate<K,V> predicate)
KStream
that consists all elements of this stream which do not satisfy a predicate.<K1> KStream<K1,V> selectKey(KeyValueMapper<K,V,K1> mapper)
K1
- the new key type on the streammapper
- the instance of KeyValueMapper
KStream
that contains records with different key type and same value type<K1,V1> KStream<K1,V1> map(KeyValueMapper<K,V,KeyValue<K1,V1>> mapper)
KStream
by transforming each element in this stream into a different element in the new stream.K1
- the key type of the new streamV1
- the value type of the new streammapper
- the instance of KeyValueMapper
KStream
that contains records with new key and value type<V1> KStream<K,V1> mapValues(ValueMapper<V,V1> mapper)
KStream
by transforming the value of each element in this stream into a new value in the new stream.V1
- the value type of the new streammapper
- the instance of ValueMapper
KStream
that contains records with unmodified keys and new values of different typevoid print()
System.out
. This function
will use the generated name of the parent processor node to label the key/value pairs
printed out to the console.
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(String streamName)
System.out
. This function
will use the given name to label the key/value printed out to the console.streamName
- the name used to label the key/value pairs printed out to the console
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde)
keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedstreamName
- the name used to label the key/value pairs printed out to the console
Implementors will need to override toString
for keys and values that are not of
type String
, Integer
etc. to get meaningful information.void writeAsText(String filePath)
void writeAsText(String filePath, String streamName)
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
filePath
- name of file to write tokeySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString
for keys and values that are not of
type String
, Integer
etc. to get meaningful information.void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde)
filePath
- name of file to write tostreamName
- the name used to label the key/value pairs printed out to the consolekeySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString
for keys and values that are not of
type String
, Integer
etc. to get meaningful information.<K1,V1> KStream<K1,V1> flatMap(KeyValueMapper<K,V,Iterable<KeyValue<K1,V1>>> mapper)
KStream
by transforming each element in this stream into zero or more elements in the new stream.K1
- the key type of the new streamV1
- the value type of the new streammapper
- the instance of KeyValueMapper
KStream
that contains more or less records with new key and value type<V1> KStream<K,V1> flatMapValues(ValueMapper<V,Iterable<V1>> processor)
KStream
by transforming the value of each element in this stream into zero or more values with the same key in the new stream.V1
- the value type of the new streamprocessor
- the instance of ValueMapper
KStream
that contains more or less records with unmodified keys and new values of different typeKStream<K,V>[] branch(Predicate<K,V>... predicates)
KStream
from this stream by branching the elements in the original stream based on the supplied predicates.
Each element is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array
corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: An element
in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and
assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.KStream<K,V> through(String topic)
KStream
from the topic
using default serializers and deserializers and producer's DefaultPartitioner
.
This is equivalent to calling to(String)
and KStreamBuilder.stream(String...)
.void foreach(ForeachAction<K,V> action)
KStream
.
Note that this is a terminal operation that returns void.action
- an action to perform on each elementKStream<K,V> through(StreamPartitioner<K,V> partitioner, String topic)
KStream
from the topic
using default serializers and deserializers and a customizable StreamPartitioner
to determine the distribution of records to partitions.
This is equivalent to calling to(StreamPartitioner, String)
and KStreamBuilder.stream(String...)
.KStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, String topic)
KStream
from the topic.
If keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
is used
— otherwise producer's DefaultPartitioner
is used.
This is equivalent to calling to(Serde, Serde, String)
and
KStreamBuilder.stream(Serde, Serde, String...)
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic nameKStream
that contains the exact same records as this KStream
KStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic)
KStream
from the topic
using a customizable StreamPartitioner
to determine the distribution of records to partitions.
This is equivalent to calling to(Serde, Serde, StreamPartitioner, String)
and
KStreamBuilder.stream(Serde, Serde, String...)
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
will be used
— otherwise DefaultPartitioner
will be usedtopic
- the topic nameKStream
that contains the exact same records as this KStream
void to(String topic)
DefaultPartitioner
.topic
- the topic namevoid to(StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner
to determine the distribution of records to partitions.partitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner
will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, String topic)
keySerde
provides a
org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
is used
— otherwise producer's DefaultPartitioner
is used.keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner
to determine the distribution of records to partitions.keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
will be used
— otherwise DefaultPartitioner
will be usedtopic
- the topic name<K1,V1> KStream<K1,V1> transform(TransformerSupplier<K,V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
KStream
instance by applying a Transformer
to all elements in this stream, one element at a time.transformerSupplier
- the instance of TransformerSupplier
that generates Transformer
stateStoreNames
- the names of the state store used by the processorKStream
with transformed key and value types<R> KStream<K,R> transformValues(ValueTransformerSupplier<V,R> valueTransformerSupplier, String... stateStoreNames)
KStream
instance by applying a ValueTransformer
to all values in this stream, one element at a time.valueTransformerSupplier
- the instance of ValueTransformerSupplier
that generates ValueTransformer
stateStoreNames
- the names of the state store used by the processorKStream
that contains records with unmodified keys and transformed values with type R
void process(ProcessorSupplier<K,V> processorSupplier, String... stateStoreNames)
Processor
.processorSupplier
- the supplier of ProcessorSupplier
that generates Processor
stateStoreNames
- the names of the state store used by the processor<V1,R> KStream<K,R> join(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde)
KStream
's elements of the same key using windowed Inner Join.
If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
keySerde
- key serdes for materializing both streams,
if not specified the default serdes defined in the configs will be usedthisValueSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde
- value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> join(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream
's elements of the same key using windowed Inner Join
with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> outerJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<V1> otherValueSerde)
KStream
's elements of the same key using windowed Outer Join.
If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with an auto-generated
store name.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
keySerde
- key serdes for materializing both streams,
if not specified the default serdes defined in the configs will be usedthisValueSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde
- value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> outerJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream
's elements of the same key using windowed Outer Join
with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with auto-generated
store names.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> leftJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValSerde, Serde<V1> otherValueSerde)
KStream
's elements of the same key using windowed Left Join.
If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with auto-generated
store names.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
keySerde
- key serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedthisValSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde
- value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,R> KStream<K,R> leftJoin(KStream<K,V1> otherStream, ValueJoiner<V,V1,R> joiner, JoinWindows windows)
KStream
's elements of the same key using windowed Left Join
with default serializers and deserializers. If a record key is null it will not included in the resulting KStream
Both of the joining KStream
s will be materialized in local state stores with auto-generated
store names.
Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
StreamsConfig
.V1
- the value type of the other streamR
- the value type of the new streamotherStream
- the instance of KStream
joined with this streamjoiner
- the instance of ValueJoiner
windows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<V1,V2> KStream<K,V2> leftJoin(KTable<K,V1> table, ValueJoiner<V,V1,V2> joiner)
KTable
's elements of the same key using non-windowed Left Join.
If a record key is null it will not included in the resulting KStream
V1
- the value type of the tableV2
- the value type of the new streamtable
- the instance of KTable
joined with this streamjoiner
- the instance of ValueJoiner
KStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key<V1,V2> KStream<K,V2> leftJoin(KTable<K,V1> table, ValueJoiner<V,V1,V2> valueJoiner, Serde<K> keySerde, Serde<V> valSerde)
KTable
's elements of the same key using non-windowed Left Join.
If a record key is null it will not included in the resulting KStream
V1
- the value type of the tableV2
- the value type of the new streamtable
- the instance of KTable
joined with this streamvalueJoiner
- the instance of ValueJoiner
keySerde
- key serdes for materializing this stream.
If not specified the default serdes defined in the configs will be usedvalSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKStream
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key and within the joining window intervals<K1> KGroupedStream<K1,V> groupBy(KeyValueMapper<K,V,K1> selector)
KStream
using the provided KeyValueMapper
and
default serializers and deserializers. If a record key is null it will not included in
the resulting KGroupedStream
K1
- the key type of the KGroupedStream
selector
- select the grouping key and value to be aggregatedKGroupedStream
that contains the grouped records of the original KStream
<K1> KGroupedStream<K1,V> groupBy(KeyValueMapper<K,V,K1> selector, Serde<K1> keySerde, Serde<V> valSerde)
KStream
using the provided KeyValueMapper
.
If a record key is null it will not included in the resulting KGroupedStream
K1
- the key type of the KGroupedStream
selector
- select the grouping key and value to be aggregatedkeySerde
- key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedStream
that contains the grouped records of the original KStream
KGroupedStream<K,V> groupByKey()
KGroupedStream
while preserving the
original values. If a record key is null it will not included in the resulting
KGroupedStream
Default Serdes will be usedKGroupedStream
KGroupedStream<K,V> groupByKey(Serde<K> keySerde, Serde<V> valSerde)
KGroupedStream
while preserving the
original values. If a record key is null it will not included in the resulting
KGroupedStream
keySerde
- key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedStream