Interface KStream<K,V>
- Type Parameters:
K
- the key type of this streamV
- the value type of this stream
KStream
is an abstraction of a record stream of key-value
pairs, i.e., each record is
an independent entity/event in the real world.
For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2>
in the stream.
A KStream
is either defined from one or multiple Kafka topics
that
are consumed message by message or the result of a KStream
transformation.
A KTable
can also be directly converted
into a KStream
.
A KStream
can be transformed record by record, joined with another KStream
, KTable
,
GlobalKTable
, or can be aggregated into a KTable
.
A KStream
can also be directly converted
into a KTable
.
Kafka Streams DSL can be mixed-and-matched with the Processor API (PAPI) (cf. Topology
) via
process(...)
and processValues(...)
.
-
Method Summary
Modifier and TypeMethodDescriptionCreate a newKStream
that consists of all records of this stream which satisfy the given predicate.Seefilter(Predicate)
.Create a newKStream
that consists all records of this stream which do not satisfy the given predicate.SeefilterNot(Predicate)
.<KOut,
VOut>
KStream<KOut,VOut> flatMap
(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) Create a newKStream
that consists of zero or more records for each record in this stream.<KR,
VOut> KStream<KR, VOut> flatMap
(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VOut>>> mapper, Named named) flatMapValues
(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper) Create a newKStream
that consists of zero or more records with modified value for each record in this stream.flatMapValues
(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper, Named named) flatMapValues
(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper) flatMapValues
(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper, Named named) void
foreach
(ForeachAction<? super K, ? super V> action) Perform an action on each record of thisKStream
.void
foreach
(ForeachAction<? super K, ? super V> action, Named named) <KOut> KGroupedStream<KOut,
V> groupBy
(KeyValueMapper<? super K, ? super V, KOut> keySelector) Group the records of thisKStream
on a new key (in contrast togroupByKey()
).<KOut> KGroupedStream<KOut,
V> Group the records by their current key into aKGroupedStream
while preserving the original values.groupByKey
(Grouped<K, V> grouped) SeegroupByKey()
.join
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) Join records of this stream withGlobalKTable
's records using non-windowed inner equi-join.join
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) join
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) join
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) join
(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed inner equi-join.join
(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) join
(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) join
(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) join
(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner) Join records of this stream withKTable
's records using non-windowed inner equi-join.join
(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) Join records of this stream withKTable
's records using non-windowed inner equi-join.join
(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner) join
(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) leftJoin
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) Join records of this stream withGlobalKTable
's records using non-windowed left equi-join.leftJoin
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) leftJoin
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) leftJoin
(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) leftJoin
(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed left equi-join.leftJoin
(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) leftJoin
(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) leftJoin
(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) leftJoin
(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner) Join records of this stream withKTable
's records using non-windowed left equi-join.leftJoin
(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) Join records of this stream withKTable
's records using non-windowed left equi-join.leftJoin
(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner) leftJoin
(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) <KOut,
VOut>
KStream<KOut,VOut> map
(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) Create a newKStream
that consists of a modified record for each record in this stream.<KOut,
VOut>
KStream<KOut,VOut> map
(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper, Named named) Seemap(KeyValueMapper)
.mapValues
(ValueMapper<? super V, ? extends VOut> mapper) Create a newKStream
that consists of all records of this stream but with a modified value.mapValues
(ValueMapper<? super V, ? extends VOut> mapper, Named named) mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper) mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper, Named named) Merge thisKStream
and the givenKStream
.Seemerge(KStream)
.outerJoin
(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed outer equi-join.outerJoin
(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) outerJoin
(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) outerJoin
(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) peek
(ForeachAction<? super K, ? super V> action) Perform an action on each record of thisKStream
.peek
(ForeachAction<? super K, ? super V> action, Named named) Seepeek(ForeachAction)
.void
Print the records of thisKStream
using the options provided byPrinted
.<KOut,
VOut>
KStream<KOut,VOut> process
(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor
(provided by the givenProcessorSupplier
) to each input record.<KOut,
VOut>
KStream<KOut,VOut> process
(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, Named named, String... stateStoreNames) processValues
(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aFixedKeyProcessor
(provided by the givenFixedKeyProcessorSupplier
) to each input record.processValues
(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, Named named, String... stateStoreNames) Materialize this stream to an auto-generated repartition topic and create a newKStream
from the auto-generated topic.repartition
(Repartitioned<K, V> repartitioned) Seerepartition()
.selectKey
(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper) Create a newKStream
that consists of all records of this stream but with a modified key.selectKey
(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper, Named named) split()
Split thisKStream
into different branches.Seesplit()
.void
Materialize this stream to a topic.void
Seeto(String)
.void
to
(TopicNameExtractor<K, V> topicExtractor) Materialize the record of this stream to different topics.void
toTable()
Convert this stream to aKTable
.toTable
(Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) SeetoTable()
.SeetoTable()
.toTable
(Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) SeetoTable()
.
-
Method Details
-
filter
Create a newKStream
that consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation (cf.processValues(FixedKeyProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).- Parameters:
predicate
- a filterPredicate
that is applied to each record- Returns:
- A
KStream
that contains only those records that satisfy the given predicate. - See Also:
-
filter
Seefilter(Predicate)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
filterNot
Create a newKStream
that consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation (cf.processValues(FixedKeyProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).- Parameters:
predicate
- a filterPredicate
that is applied to each record- Returns:
- A
KStream
that contains only those records that do not satisfy the given predicate. - See Also:
-
filterNot
SeefilterNot(Predicate)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
selectKey
Create a newKStream
that consists of all records of this stream but with a modified key. The providedKeyValueMapper
is applied to each input record and computes a new key (possibly of a different type) for it. Thus, an input record<K,V>
can be transformed into an output record<K':V>
. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).For example, you can use this transformation to set a key for a key-less input record
<null,V>
by extracting a key from the value within yourKeyValueMapper
. The example below computes the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the resultKStream<Byte[], String> keyLessStream = builder.stream("key-less-topic"); KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> { Integer apply(Byte[] key, String value) { return value.length(); } });
KStream
.- Type Parameters:
KOut
- the new key type of the resultKStream
- Parameters:
mapper
- aKeyValueMapper
that computes a new key for each input record- Returns:
- A
KStream
that contains records with new key (possibly of a different type) and unmodified value. - See Also:
-
selectKey
<KOut> KStream<KOut,V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper, Named named) SeeselectKey(KeyValueMapper)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
mapValues
Create a newKStream
that consists of all records of this stream but with a modified value. The providedValueMapper
is applied to each input record value and computes a new value (possibly of a different type) for it. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. If you need read access to the input record key, usemapValues(ValueMapperWithKey)
. This is a stateless record-by-record operation (cf.processValues(FixedKeyProcessorSupplier, String...)
for stateful value processing or if you need access to the record's timestamp, headers, or other metadata).The example below counts the number of token of the value string.
Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the resultKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });
KStream
(cf.map(KeyValueMapper)
).- Type Parameters:
VOut
- the value type of the result stream- Parameters:
mapper
- aValueMapper
that computes a new value for each input record- Returns:
- A
KStream
that contains records with unmodified key and new values (possibly of a different type). - See Also:
-
mapValues
SeemapValues(ValueMapper)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
mapValues
SeemapValues(ValueMapper)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
mapValues
<VOut> KStream<K,VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper, Named named) SeemapValues(ValueMapperWithKey)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
map
<KOut,VOut> KStream<KOut,VOut> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) Create a newKStream
that consists of a modified record for each record in this stream. The providedKeyValueMapper
is applied to each input record and computes a new output record (possibly of a different key and/or value type) for it. Thus, an input record<K,V>
can be transformed into an output record<K':V'>
. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
The providedKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> { KeyValue<String, Integer> apply(String key, String value) { return new KeyValue<>(key.toUpperCase(), value.split(" ").length); } });
KeyValueMapper
must return aKeyValue
type and must not returnnull
.Mapping records might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result
KStream
(cf.mapValues(ValueMapper)
).- Type Parameters:
KOut
- the key type of the result streamVOut
- the value type of the result stream- Parameters:
mapper
- aKeyValueMapper
that computes a newKeyValue
pair for each input record- Returns:
- A
KStream
that contains records with new key and new value (possibly of different types). - See Also:
-
map
<KOut,VOut> KStream<KOut,VOut> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper, Named named) Seemap(KeyValueMapper)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
flatMap
<KOut,VOut> KStream<KOut,VOut> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) Create a newKStream
that consists of zero or more records for each record in this stream. The providedKeyValueMapper
is applied to each input record and computes zero or more output records (possibly of a different key and/or value type) for it. Thus, an input record<K,V>
can be transformed into output records<K':V'>, <K':V'>, ...
. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).The example below splits input records
<null:String>
containing sentences as values into their words and emit a record<word:1>
for each word.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.flatMap( new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> { Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) { String[] tokens = value.split(" "); List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length); for(String token : tokens) { result.add(new KeyValue<>(token, 1)); } return result; } });
KeyValueMapper
must return anIterable
(e.g., anyCollection
type) and the return value must not benull
.Flat-mapping records might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the result
KStream
. (cf.flatMapValues(ValueMapper)
)- Type Parameters:
KOut
- the key type of the result streamVOut
- the value type of the result stream- Parameters:
mapper
- aKeyValueMapper<K, V, Iterable<KeyValue<K', V'>>>
that computes zero of more newKeyValue
pairs for each input record- Returns:
- A
KStream
that contains more or fewer records with new keys and values (possibly of different types). - See Also:
-
flatMap
<KR,VOut> KStream<KR,VOut> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VOut>>> mapper, Named named) SeeflatMap(KeyValueMapper)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
flatMapValues
<VOut> KStream<K,VOut> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper) Create a newKStream
that consists of zero or more records with modified value for each record in this stream. The providedValueMapper
is applied to each input record value and computes zero or more output values (possibly of a different type) for it. Thus, an input record<K,V>
can be transformed into output records<K:V'>, <K:V'>, ...
. If you need read access to the input record key, useflatMapValues(ValueMapperWithKey)
. This is a stateless record-by-record operation (cf.processValues(FixedKeyProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).The example below splits input records
<null:String>
containing sentences as values into their words.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> { Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } });
ValueMapper
must return anIterable
(e.g., anyCollection
type) and the return value must not benull
.Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the result
KStream
(cf.flatMap(KeyValueMapper)
).- Type Parameters:
VOut
- the value type of the result stream- Parameters:
mapper
- aValueMapper<V, Iterable<V>>
that computes zero or more new values for each input record- Returns:
- A
KStream
that contains more or fewer records with unmodified keys but new values (possibly of a different type). - See Also:
-
flatMapValues
<VOut> KStream<K,VOut> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper, Named named) SeeflatMapValues(ValueMapper)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
flatMapValues
<VOut> KStream<K,VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper) SeeflatMapValues(ValueMapper)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
flatMapValues
<VOut> KStream<K,VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper, Named named) SeeflatMapValues(ValueMapperWithKey)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
print
Print the records of thisKStream
using the options provided byPrinted
. Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print. It SHOULD NOT be used for production usage if performance requirements are concerned.- Parameters:
printed
- options for printing
-
foreach
Perform an action on each record of thisKStream
. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).Foreach
is a terminal operation that may triggers side effects (such as logging or statistics collection) and returnsvoid
(cf.peek(ForeachAction)
).Note that this operation may execute multiple times for a single record in failure cases, and it is not guarded by "exactly-once processing guarantees".
- Parameters:
action
- an action to perform on each record
-
foreach
Seeforeach(ForeachAction)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
peek
Perform an action on each record of thisKStream
. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)
for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).Peek
is a non-terminal operation that may triggers side effects (such as logging or statistics collection) and returns an unchangedKStream
(cf.foreach(ForeachAction)
).Note that this operation may execute multiple times for a single record in failure cases, and it is not guarded by "exactly-once processing guarantees".
- Parameters:
action
- an action to perform on each record- Returns:
- An unmodified
KStream
.
-
peek
Seepeek(ForeachAction)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
split
BranchedKStream<K,V> split()Split thisKStream
into different branches. The returnedBranchedKStream
instance can be used for routing the records to different branches depending on evaluation against the supplied predicates. Records are evaluated against the predicates in the order they are provided with the first matching predicate accepting the record. Branching is a stateless record-by-record operation. SeeBranchedKStream
for a detailed description and usage example.Splitting a
KStream
guarantees that each input record is sent to at most one resultKStream
. There is no operator for broadcasting/multicasting records into multiple resultKStream
. If you want to broadcast records, you can apply multiple downstream operators to the sameKStream
instance:
Multicasting can be achieved with broadcasting into multiple filter operations:// Broadcasting: every record of `stream` is sent to all three operators for processing KStream<...> stream1 = stream.map(...); KStream<...> stream2 = stream.mapValue(...); KStream<...> stream3 = stream.flatMap(...);
// Multicasting: every record of `stream` is sent to all three filters, and thus, may be part of // multiple result streams, `stream1`, `stream2`, and/or `stream3` KStream<...> stream1 = stream.filter(predicate1); KStream<...> stream2 = stream.filter(predicate2); KStream<...> stream3 = stream.filter(predicate3);
- Returns:
- A
BranchedKStream
that provides methods for routing the records to different branches. - See Also:
-
split
-
merge
Merge thisKStream
and the givenKStream
.There is no ordering guarantee between records from this
KStream
and records from the providedKStream
in the merged stream. Relative order is preserved within each input stream though (i.e., records within one input stream are processed in order).- Parameters:
otherStream
- a stream which is to be merged into this stream- Returns:
- A merged stream containing all records from this and the provided
KStream
- See Also:
-
merge
Seemerge(KStream)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
repartition
Materialize this stream to an auto-generated repartition topic and create a newKStream
from the auto-generated topic.The created topic is considered an internal topic and is meant to be used only by the current Kafka Streams instance. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.You can retrieve all generated internal topic names via
Topology.describe()
. To explicitly set key/value serdes, specify the number of used partitions or the partitioning strategy, or to customize the name of the repartition topic, userepartition(Repartitioned)
.- Returns:
- A
KStream
that contains the exact same, but repartitioned records as thisKStream
.
-
repartition
Seerepartition()
. -
to
Materialize this stream to a topic. The topic should be manually created before it is used (i.e., before the Kafka Streams application is started).To explicitly set key/value serdes or the partitioning strategy, use
to(String, Produced)
.- Parameters:
topic
- the output topic name- See Also:
-
to
Seeto(String)
. -
to
Materialize the record of this stream to different topics. The providedTopicNameExtractor
is applied to each input record to compute the output topic name. All topics should be manually created before they are used (i.e., before the Kafka Streams application is started).To explicitly set key/value serdes or the partitioning strategy, use
to(TopicNameExtractor, Produced)
.- Parameters:
topicExtractor
- the extractor to determine the name of the Kafka topic to write to for each record- See Also:
-
to
-
toTable
Convert this stream to aKTable
. The conversion is a logical operation and only changes the "interpretation" of the records, i.e., each record of this stream is a "fact/event" and is re-interpreted as a "change/update-per-key" now (cf.KStream
vsKTable
). The resultingKTable
is essentially a changelog stream. To "upsert" the records of this stream into a materializedKTable
(i.e., into a state store), usetoTable(Materialized)
.Note that
null
keys are not supported byKTables
and records withnull
key will be dropped.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper)
,map(KeyValueMapper)
,flatMap(KeyValueMapper)
orprocess(ProcessorSupplier, String...)
) Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that the resultingKTable
is correctly partitioned by its key.This internal repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.Note: If the result
KTable
is materialized, it is not possible to apply"source topic optimization"
, because repartition topics are considered transient and don't allow to recover the resultKTable
in case of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.You can retrieve all generated internal topic names via
Topology.describe()
. To customize the name of the repartition topic, usetoTable(Named)
. For more control over the repartitioning, userepartition(Repartitioned)
beforetoTable()
.- Returns:
- A
KTable
that contains the same records as thisKStream
.
-
toTable
SeetoTable()
. -
toTable
KTable<K,V> toTable(Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) SeetoTable()
. -
toTable
KTable<K,V> toTable(Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) SeetoTable()
. -
groupByKey
KGroupedStream<K,V> groupByKey()Group the records by their current key into aKGroupedStream
while preserving the original values.KGroupedStream
can be further grouped with other streams to form aCogroupedKStream
. (Co-)Grouping a stream on the record key is required before a windowing or aggregation operator can be applied to the data (cf.KGroupedStream
). By default, the current key is used as grouping key, but a new grouping key can be set viagroupBy(KeyValueMapper)
. In either case, if the grouping key isnull
, the record will be dropped.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper)
,map(KeyValueMapper)
,flatMap(KeyValueMapper)
orprocess(ProcessorSupplier, String...)
) Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that the resultingKGroupedStream
is correctly partitioned by the grouping key, before the downstream windowing/aggregation will be applied.This internal repartition topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on the upstream topics partition numbers. Furthermore, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.You can retrieve all generated internal topic names via
Topology.describe()
. To explicitly set key/value serdes or to customize the name of the repartition topic, usegroupByKey(Grouped)
. For more control over the repartitioning, userepartition(Repartitioned)
beforegroupByKey()
.- Returns:
- A
KGroupedStream
that contains the grouped records of the originalKStream
.
-
groupByKey
SeegroupByKey()
.Takes an additional
Grouped
parameter, that allows to explicitly set key/value serdes or to customize the name of the potentially created internal repartition topic. -
groupBy
Group the records of thisKStream
on a new key (in contrast togroupByKey()
). This operation is semantically equivalent toselectKey(KeyValueMapper)
followed bygroupByKey()
.Because a new key is selected, an internal repartitioning topic will be created in Kafka. See
groupByKey()
for more details about auto-repartitioning.- Type Parameters:
KOut
- the new key type of the resultKGroupedStream
- Parameters:
keySelector
- aKeyValueMapper
that computes a new key for grouping
-
groupBy
<KOut> KGroupedStream<KOut,V> groupBy(KeyValueMapper<? super K, ? super V, KOut> keySelector, Grouped<KOut, V> grouped) SeegroupBy(KeyValueMapper)
.Takes an additional
Grouped
parameter, that allows to explicitly set key/value serdes or to customize the name of the created internal repartition topic. -
join
<VRight,VOut> KStream<K,VOut> join(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed inner equi-join. The join is computed using the records' key as join attribute, i.e.,leftRecord.key == rightRight.key
. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If you need read access to the join key, usejoin(KStream, ValueJoinerWithKey, JoinWindows)
. If an input record's key or value isnull
the input record will be dropped, and no join computation is triggered. Similarly, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.Example (assuming all input records belong to the correct windows):
left right result <K1:A> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> KStreams
(or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case (and if not auto-repartitioning happens, see further below), you would need to callrepartition(Repartitioned)
(for at least one of the twoKStreams
) before doing the join and specify the matching number of partitions viaRepartitioned
parameter to align the partition count for both inputs to each other. Furthermore, bothKStreams
need to be co-partitioned on the join key (i.e., use the same partitioner). Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure that the same partitioner is used for both inputs for the join.If a key changing operator was used before this operation on either input stream (e.g.,
selectKey(KeyValueMapper)
,map(KeyValueMapper)
,flatMap(KeyValueMapper)
orprocess(ProcessorSupplier, String...)
) Kafka Streams will automatically repartition the data of the corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that data is correctly partitioned by the join key.The repartitioning topic(s) will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic(s) is determined based on the partition numbers of both upstream topics, and Kafka Streams will automatically align the number of partitions if required for co-partitioning. Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged by Kafka Streams.Both of the joined
KStream
s will be materialized in local state stores. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe()
. To explicitly set key/value serdes, to customize the names of the repartition and changelog topic, or to customize the used state store, usejoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
. For more control over the repartitioning, userepartition(Repartitioned)
on eiter input beforejoin()
.- Type Parameters:
VRight
- the value type of the right streamVOut
- the value type of the result stream- Parameters:
rightStream
- theKStream
to be joined with this streamjoiner
- aValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of theJoinWindows
- Returns:
- A
KStream
that contains join-records, one for each matched record-pair, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
join
<VRight,VOut> KStream<K,VOut> join(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Seejoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
join
<VRight,VOut> KStream<K,VOut> join(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) -
join
<VRight,VOut> KStream<K,VOut> join(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) Seejoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
leftJoin
<VRight,VOut> KStream<K,VOut> leftJoin(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed left equi-join. In contrast to aninner join
, all records from this stream will produce at least one output record (more details below). The join is computed using the records' key as join attribute, i.e.,leftRecord.key == rightRight.key
. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Furthermore, for each input record of thisKStream
that does not have any join-partner in the right stream (i.e., no record with the same key within the join interval),ValueJoiner
will be called with anull
value for the right stream.Note: By default, non-joining records from this stream are buffered until their join window closes, and corresponding left-join results for these records are emitted with some delay. If you want to get left-join results without any delay, you can use
JoinWindows#of(Duration) [deprecated]
instead. However, such an "eager" left-join result could be a spurious result, because the same record may find actual join partners later, producing additional inner-join results.The key of the result record is the same as for both joining input records, or the left input record's key for a left-join result. If you need read access to the join key, use
leftJoin(KStream, ValueJoinerWithKey, JoinWindows)
. If a left input record's value isnull
the input record will be dropped, and no join computation is triggered. Note, that for left input records,null
keys are supported (in contrast toinner join
), resulting in a left join result. If a right input record's key or value isnull
the input record will be dropped, and no join computation is triggered. For input record of either side, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.Example (assuming all input records belong to the correct windows, not taking actual emit/window-close time for left-join results, or eager/spurious results into account):
left right result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> join(KStream, ValueJoiner, JoinWindows)
.- Returns:
- A
KStream
that contains join-records, one for each matched record-pair plus one for each non-matching record of thisKStream
, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
leftJoin
<VRight,VOut> KStream<K,VOut> leftJoin(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) SeeleftJoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
leftJoin
<VRight,VOut> KStream<K,VOut> leftJoin(KStream<K, VRight> rightStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) -
leftJoin
<VRight,VOut> KStream<K,VOut> leftJoin(KStream<K, VRight> rightStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) SeeleftJoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
outerJoin
<VRight,VOut> KStream<K,VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) Join records of this (left) stream with another (right)KStream
's records using a windowed outer equi-join. In contrast to aninner join
orleft join
, all records from both stream will produce at least one output record (more details below). The join is computed using the records' key as join attribute, i.e.,leftRecord.key == rightRight.key
. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Furthermore, for each input record of eitherKStream
that does not have any join-partner in the other stream (i.e., no record with the same key within the join interval),ValueJoiner
will be called with anull
value for right/left stream, respectively.Note: By default, non-joining records from either stream are buffered until their join window closes, and corresponding outer-join results for these records are emitted with some delay. If you want to get outer-join results without any delay, you can use
JoinWindows#of(Duration) [deprecated]
instead. However, such an "eager" outer-join result could be a spurious result, because the same record may find actual join partners later, producing additional inner-join results.The key of the result record is the same as for both joining input records, or the left/right input record's key for an outer-join result, respectively. If you need read access to the join key, use
outerJoin(KStream, ValueJoinerWithKey, JoinWindows)
. If an input record's value isnull
the input record will be dropped, and no join computation is triggered. Note, that input records withnull
keys are supported (in contrast toinner join
), resulting in left/right join results. For input record of either side, so-called late records, i.e., records with a timestamp belonging to an already closed window (based on stream-time progress, window size, and grace period), will be dropped.Example (assuming all input records belong to the correct windows, not taking actual emit/window-close time for outer-join result, or eager/spurious results into account):
left right result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> <K3:ValueJoiner(null,c)> join(KStream, ValueJoiner, JoinWindows)
.- Returns:
- A
KStream
that contains join-records, one for each matched record-pair plus one for each non-matching record of either inputKStream
, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
outerJoin
<VRight,VOut> KStream<K,VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) SeeouterJoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
-
outerJoin
<VRight,VOut> KStream<K,VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) -
outerJoin
<VRight,VOut> KStream<K,VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) SeeouterJoin(KStream, ValueJoiner, JoinWindows)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
-
join
<TableValue,VOut> KStream<K,VOut> join(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner) Join records of this stream withKTable
's records using non-windowed inner equi-join. The join is a primary key table lookup join with join attributestreamRecord.key == tableRecord.key
. "Table lookup join" means, that results are only computed ifKStream
records are processed. This is done by performing a lookup for matching records into the internalKTable
state. In contrast, processingKTable
input records will only update the internalKTable
state and will not produce any result records.For each
KStream
record that finds a joining record in theKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If you need read access to the join key, usejoin(KTable, ValueJoinerWithKey)
. If aKStream
input record's key or value isnull
the input record will be dropped, and no join computation is triggered. If aKTable
input record's key isnull
the input record will be dropped, and the table state won't be updated.KTable
input records withnull
values are considered deletes (so-called tombstone) for the table.Example:
KStream KTable state result <K1:A> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> KStream
records are processed by performing a lookup for matching records in the current (i.e., processing time) internalKTable
state. This default implementation does not handle out-of-order records in either input of the join well. Seejoin(KTable, ValueJoiner, Joined)
on how to configure a stream-table join to handle out-of-order data.KStream
andKTable
(or to be more precise, their underlying source topics) need to have the same number of partitions (cf.join(GlobalKTable, KeyValueMapper, ValueJoiner)
). If this is not the case (and if no auto-repartitioning happens for theKStream
, see further below), you would need to callrepartition(Repartitioned)
for thisKStream
before doing the join, specifying the same number of partitions viaRepartitioned
parameter as the givenKTable
. Furthermore,KStream
andKTable
need to be co-partitioned on the join key (i.e., use the same partitioner). Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure that the same partitioner is used for both inputs of the join.If a key changing operator was used on this
KStream
before this operation (e.g.,selectKey(KeyValueMapper)
,map(KeyValueMapper)
,flatMap(KeyValueMapper)
orprocess(ProcessorSupplier, String...)
) Kafka Streams will automatically repartition the data of thisKStream
, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic such that data is correctly partitioned by theKTable
's key.The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. The number of partitions for the repartition topic is determined based on number of partitions of theKTable
. Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged by Kafka Streams.You can retrieve all generated internal topic names via
Topology.describe()
. To explicitly set key/value serdes or to customize the names of the repartition topic, usejoin(KTable, ValueJoiner, Joined)
. For more control over the repartitioning, userepartition(Repartitioned)
beforejoin()
.- Type Parameters:
TableValue
- the value type of the tableVOut
- the value type of the result stream- Parameters:
table
- theKTable
to be joined with this streamjoiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- A
KStream
that contains join-records, one for each matched stream record, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
join
<TableValue,VOut> KStream<K,VOut> join(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner) Seejoin(KTable, ValueJoiner)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
join
<TableValue,VOut> KStream<K,VOut> join(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) Join records of this stream withKTable
's records using non-windowed inner equi-join. In contrast tojoin(KTable, ValueJoiner)
, but only if the usedKTable
is backed by aVersionedKeyValueStore
, the additionalJoined
parameter allows to specify a join grace-period, to handle out-of-order data gracefully.For details about stream-table semantics, including co-partitioning requirements, (auto-)repartitioning, and more see
join(KTable, ValueJoiner)
. If you specify a grace-period to handle out-of-order data, see further details below.To handle out-of-order records, the input
KTable
must use aVersionedKeyValueStore
(specified via aMaterialized
parameter when theKTable
is created), and a joingrace-period
must be specified. For this case,KStream
records are buffered until the end of the grace period and theKTable
lookup is performed with some delay. Given that theKTable
state is versioned, the lookup can use "event time", allowing out-of-orderKStream
records, to join to the right (older) version of aKTable
record with the same key. Also,KTable
out-of-order updates are handled correctly by the versioned state store. Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp smaller than the defined grace-period allows; these late records will be dropped, and not join computation is triggered. Using a versioned state store for theKTable
also implies that the definedhistory retention
provides a cut-off point, and late records will be dropped, not updating theKTable
state.If a join grace-period is specified, the
KStream
will be materialized in a local state store. For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe()
. To customize the name of the changelog topic, useJoined
input parameter. -
join
<TableValue,VOut> KStream<K,VOut> join(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) Seejoin(KTable, ValueJoiner, Joined)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
leftJoin
<VTable,VOut> KStream<K,VOut> leftJoin(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner) Join records of this stream withKTable
's records using non-windowed left equi-join. In contrast to aninner join
, all records from this stream will produce an output record (more details below). The join is a primary key table lookup join with join attributestreamRecord.key == tableRecord.key
. "Table lookup join" means, that results are only computed ifKStream
records are processed. This is done by performing a lookup for matching records into the internalKTable
state. In contrast, processingKTable
input records will only update the internalKTable
state and will not produce any result records.For each
KStream
record, regardless if it finds a joining record in theKTable
, the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. If noKTable
record with matching key was found during the lookup,ValueJoiner
will be called with anull
value for the table record. The key of the result record is the same as for both joining input records, or theKStreams
input record's key for a left-join result. If you need read access to the join key, useleftJoin(KTable, ValueJoinerWithKey)
. If aKStream
input record's value isnull
the input record will be dropped, and no join computation is triggered. Note, thatnull
keys forKStream
input records are supported (in contrast toinner join
) resulting in a left join result. If aKTable
input record's key isnull
the input record will be dropped, and the table state won't be updated.KTable
input records withnull
values are considered deletes (so-called tombstone) for the table.Example:
KStream KTable state result <K1:A> <K1:ValueJoiner(A,null)> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> KStream
records are processed by performing a lookup for matching records in the current (i.e., processing time) internalKTable
state. This default implementation does not handle out-of-order records in either input of the join well. SeeleftJoin(KTable, ValueJoiner, Joined)
on how to configure a stream-table join to handle out-of-order data.For more details, about co-partitioning requirements, (auto-)repartitioning, and more see
join(KStream, ValueJoiner, JoinWindows)
.- Returns:
- A
KStream
that contains join-records, one for each matched stream record plus one for each non-matching stream record, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
leftJoin
<VTable,VOut> KStream<K,VOut> leftJoin(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner) SeeleftJoin(KTable, ValueJoiner)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
leftJoin
<VTable,VOut> KStream<K,VOut> leftJoin(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) Join records of this stream withKTable
's records using non-windowed left equi-join. In contrast toleftJoin(KTable, ValueJoiner)
, but only if the usedKTable
is backed by aVersionedKeyValueStore
, the additionalJoined
parameter allows to specify a join grace-period, to handle out-of-order data gracefully.For details about left-stream-table-join semantics see
leftJoin(KTable, ValueJoiner)
. For co-partitioning requirements, (auto-)repartitioning, and more seejoin(KTable, ValueJoiner)
. If you specify a grace-period to handle out-of-order data, seejoin(KTable, ValueJoiner, Joined)
. -
leftJoin
<VTable,VOut> KStream<K,VOut> leftJoin(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) SeeleftJoin(KTable, ValueJoiner, Joined)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
join
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) Join records of this stream withGlobalKTable
's records using non-windowed inner equi-join. The join is a primary key table lookup join with join attributekeyValueMapper.map(streamRecord) == tableRecord.key
. "Table lookup join" means, that results are only computed ifKStream
records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalGlobalKTable
state. In contrast, processingGlobalKTable
input records will only update the internalGlobalKTable
state and will not produce any result records.For each
KStream
record that finds a joining record in theGlobalKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the stream record's key. If you need read access to theKStream
key, usejoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
. If aKStream
input record's value isnull
or if the providedkeySelector
returnsnull
, the input record will be dropped, and no join computation is triggered. If aGlobalKTable
input record's key isnull
the input record will be dropped, and the table state won't be updated.GlobalKTable
input records withnull
values are considered deletes (so-called tombstone) for the table.Example, using the first value attribute as join key:
KStream GlobalKTable state result <K1:(GK1,A)> <GK1:b> <GK1:b> <K1:(GK1,C)> <GK1:b> <K1:ValueJoiner((GK1,C),b)> join(KTable, ValueJoiner)
, there is no co-partitioning requirement between thisKStream
and theGlobalKTable
. Also note that there are no ordering guarantees between the updates on the left and the right side of this join, since updates to theGlobalKTable
are in no way synchronized. Therefore, the result of the join is inherently non-deterministic.- Type Parameters:
GlobalKey
- the key type of the global tableGlobalValue
- the value type of the global tableVOut
- the value type of the result stream- Parameters:
globalTable
- theGlobalKTable
to be joined with this streamkeySelector
- aKeyValueMapper
that computes the join key for stream input recordsjoiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- A
KStream
that contains join-records, one for each matched stream record, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
join
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) Seejoin(GlobalKTable, KeyValueMapper, ValueJoiner)
.Note that the
KStream
key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results. -
join
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) Seejoin(GlobalKTable, KeyValueMapper, ValueJoiner)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
join
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) Seejoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
leftJoin
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) Join records of this stream withGlobalKTable
's records using non-windowed left equi-join. In contrast to aninner join
, all records from this stream will produce an output record (more details below). The join is a primary key table lookup join with join attributekeyValueMapper.map(streamRecord) == tableRecord.key
. "Table lookup join" means, that results are only computed ifKStream
records are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalGlobalKTable
state. In contrast, processingGlobalKTable
input records will only update the internalGlobalKTable
state and will not produce any result records.For each
KStream
record, regardless if it finds a joining record in theGlobalKTable
, the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. If noGlobalKTable
record with matching key was found during the lookup,ValueJoiner
will be called with anull
value for the global table record. The key of the result record is the same as for both joining input records, or theKStreams
input record's key for a left-join result. If you need read access to theKStream
key, useleftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
. If aKStream
input record's value isnull
or if the providedkeySelector
returnsnull
, the input record will be dropped, and no join computation is triggered. Note, thatnull
keys forKStream
input records are supported (in contrast toinner join
) resulting in a left join result. If aGlobalKTable
input record's key isnull
the input record will be dropped, and the table state won't be updated.GlobalKTable
input records withnull
values are considered deletes (so-called tombstone) for the table.Example, using the first value attribute as join key:
KStream GlobalKTable state result <K1:(GK1,A)> <K1:ValueJoiner((GK1,A),null)> <GK1:b> <GK1:b> <K1:(GK1,C)> <GK1:b> <K1:ValueJoiner((GK1,C),b)> leftJoin(KTable, ValueJoiner)
, there is no co-partitioning requirement between thisKStream
and theGlobalKTable
. Also note that there are no ordering guarantees between the updates on the left and the right side of this join, since updates to theGlobalKTable
are in no way synchronized. Therefore, the result of the join is inherently non-deterministic.- Type Parameters:
GlobalKey
- the key type of the global tableGlobalValue
- the value type of the global tableVOut
- the value type of the result stream- Parameters:
globalTable
- theGlobalKTable
to be joined with this streamkeySelector
- aKeyValueMapper
that computes the join key for stream input recordsjoiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- A
KStream
that contains join-records, one for each matched stream record plus one for each non-matching stream record, with the corresponding key and a value computed by the givenValueJoiner
. - See Also:
-
leftJoin
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) SeeleftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
.Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and incorrect results.
-
leftJoin
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) SeeleftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
leftJoin
<GlobalKey,GlobalValue, KStream<K,VOut> VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) SeeleftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
process
<KOut,VOut> KStream<KOut,VOut> process(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor
(provided by the givenProcessorSupplier
) to each input record. TheProcessor
can emit any number of result records viaProcessorContext.forward(Record)
(possibly of a different key and/or value type).By default, the processor is stateless (similar to
flatMap(KeyValueMapper, Named)
, however, it also has access to therecord's
timestamp and headers), but previously addedstate stores
can be connected by providing their names as additional parameters, making the processor stateful. There is two differentstate stores
, which can be added to the underlyingTopology
:state stores
for processing (i.e., read/write access)read-only state stores
processorSupplier
provides state stores viaConnectedStoreProvider.stores()
, the correspondingStoreBuilders
will be added to the topology and connected to this processor automatically, without the need to provide the store names as parameter to this method. Additionally, even if a processor is stateless, it can still access allglobal state stores
(read-only). There is no need to connect global stores to processors.All state stores which are connected to a processor and all global stores, can be accessed via
context.getStateStore(String)
using the context provided viaProcessor#init()
:
Furthermore, the providedpublic class MyProcessor implements Processor<String, Integer, String, Integer> { private ProcessorContext<String, Integer> context; private KeyValueStore<String, String> store; @Override void init(final ProcessorContext<String, Integer> context) { this.context = context; this.store = context.getStateStore("myStore"); } @Override void process(final Record<String, Integer> record) { // can access this.context and this.store } }
ProcessorContext
gives access to topology, runtime, andrecord metadata
, and allows to schedulepunctuations
and to request offset commits.In contrast to grouping/aggregation and joins, even if the processor is stateful and an upstream operation was key changing, no auto-repartition is triggered. If repartitioning is required, a call to
repartition()
should be performed beforeprocess()
. At the same time, this method is considered a key changing operation by itself, and might result in an internal data redistribution if a key-based operator (like an aggregation or join) is applied to the resultKStream
(cf.processValues(FixedKeyProcessorSupplier, String...)
).- Parameters:
processorSupplier
- the supplier used to obtainProcessor
instancesstateStoreNames
- the names of state stores that the processor should be able to access
-
process
<KOut,VOut> KStream<KOut,VOut> process(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, Named named, String... stateStoreNames) Seeprocess(ProcessorSupplier, String...)
.Takes an additional
Named
parameter that is used to name the processor in the topology. -
processValues
<VOut> KStream<K,VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aFixedKeyProcessor
(provided by the givenFixedKeyProcessorSupplier
) to each input record. This method is similar toprocess(ProcessorSupplier, String...)
, however the key of the inputRecord
cannot be modified.Because the key cannot be modified, this method is not a key changing operation and preserves data co-location with respect to the key (cf.
flatMapValues(ValueMapper)
). Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) is applied to the resultKStream
.However, because the key cannot be modified, some restrictions apply to a
FixedKeyProcessor
compared to aProcessor
: for example, forwarding result records from aPunctuator
is not possible. -
processValues
<VOut> KStream<K,VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, Named named, String... stateStoreNames) SeeprocessValues(FixedKeyProcessorSupplier, String...)
.Takes an additional
Named
parameter that is used to name the processor in the topology.
-