K - Type of keysV - Type of values@InterfaceStability.Unstable
public interface KStream<K,V>
KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an
independent entity/event in the real world.
For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2>
in the stream.
A KStream is either defined from one or multiple Kafka topics that
are consumed message by message or the result of a KStream transformation.
A KTable can also be converted into a KStream.
A KStream can be transformed record by record, joined with another KStream, KTable,
GlobalKTable, or can be aggregated into a KTable.
Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. TopologyBuilder) via
process(...),
transform(...), and
transformValues(...).
KTable,
KGroupedStream,
KStreamBuilder.stream(String...)| Modifier and Type | Method and Description |
|---|---|
KStream<K,V>[] |
branch(Predicate<? super K,? super V>... predicates)
Creates an array of
KStream from this stream by branching the records in the original stream based on
the supplied predicates. |
KStream<K,V> |
filter(Predicate<? super K,? super V> predicate)
Create a new
KStream that consists of all records of this stream which satisfy the given predicate. |
KStream<K,V> |
filterNot(Predicate<? super K,? super V> predicate)
Create a new
KStream that consists all records of this stream which do not satisfy the given
predicate. |
<KR,VR> KStream<KR,VR> |
flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<VR> KStream<K,VR> |
flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> processor)
Create a new
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream. |
void |
foreach(ForeachAction<? super K,? super V> action)
Perform an action on each record of
KStream. |
<KR> KGroupedStream<KR,V> |
groupBy(KeyValueMapper<? super K,? super V,KR> selector)
Group the records of this
KStream on a new key that is selected using the provided KeyValueMapper
and default serializers and deserializers. |
<KR> KGroupedStream<KR,V> |
groupBy(KeyValueMapper<? super K,? super V,KR> selector,
Serde<KR> keySerde,
Serde<V> valSerde)
Group the records of this
KStream on a new key that is selected using the provided KeyValueMapper. |
KGroupedStream<K,V> |
groupByKey()
Group the records by their current key into a
KGroupedStream while preserving the original values
and default serializers and deserializers. |
KGroupedStream<K,V> |
groupByKey(Serde<K> keySerde,
Serde<V> valSerde)
Group the records by their current key into a
KGroupedStream while preserving the original values. |
<GK,GV,RV> KStream<K,RV> |
join(GlobalKTable<GK,GV> globalKTable,
KeyValueMapper<? super K,? super V,? extends GK> keyValueMapper,
ValueJoiner<? super V,? super GV,? extends RV> joiner)
Join records of this stream with
GlobalKTable's records using non-windowed inner equi join. |
<VO,VR> KStream<K,VR> |
join(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream's records using windowed inner equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
join(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<VO> otherValueSerde)
Join records of this stream with another
KStream's records using windowed inner equi join. |
<VT,VR> KStream<K,VR> |
join(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner)
Join records of this stream with
KTable's records using non-windowed inner equi join with default
serializers and deserializers. |
<VT,VR> KStream<K,VR> |
join(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner,
Serde<K> keySerde,
Serde<V> valSerde)
Join records of this stream with
KTable's records using non-windowed inner equi join. |
<GK,GV,RV> KStream<K,RV> |
leftJoin(GlobalKTable<GK,GV> globalKTable,
KeyValueMapper<? super K,? super V,? extends GK> keyValueMapper,
ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
Join records of this stream with
GlobalKTable's records using non-windowed left equi join. |
<VO,VR> KStream<K,VR> |
leftJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream's records using windowed left equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
leftJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValSerde,
Serde<VO> otherValueSerde)
Join records of this stream with another
KStream's records using windowed left equi join. |
<VT,VR> KStream<K,VR> |
leftJoin(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner)
Join records of this stream with
KTable's records using non-windowed left equi join with default
serializers and deserializers. |
<VT,VR> KStream<K,VR> |
leftJoin(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner,
Serde<K> keySerde,
Serde<V> valSerde)
Join records of this stream with
KTable's records using non-windowed left equi join. |
<KR,VR> KStream<KR,VR> |
map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
Transform each record of the input stream into a new record in the output stream (both key and value type can be
altered arbitrarily).
|
<VR> KStream<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
<VO,VR> KStream<K,VR> |
outerJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream's records using windowed left equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
outerJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<VO> otherValueSerde)
Join records of this stream with another
KStream's records using windowed left equi join. |
void |
print()
Print the records of this stream to
System.out. |
void |
print(Serde<K> keySerde,
Serde<V> valSerde)
Print the records of this stream to
System.out. |
void |
print(Serde<K> keySerde,
Serde<V> valSerde,
String streamName)
Print the records of this stream to
System.out. |
void |
print(String streamName)
Print the records of this stream to
System.out. |
void |
process(ProcessorSupplier<? super K,? super V> processorSupplier,
String... stateStoreNames)
Process all records in this stream, one record at a time, by applying a
Processor (provided by the given
ProcessorSupplier). |
<KR> KStream<KR,V> |
selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Set a new key (with possibly new type) for each input record.
|
KStream<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this stream to a topic and creates a new
KStream from the topic using a customizable
StreamPartitioner to determine the distribution of records to partitions. |
KStream<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this stream to a topic, and creates a new
KStream from the topic. |
KStream<K,V> |
through(StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this stream to a topic and creates a new
KStream from the topic using default serializers and
deserializers and a customizable StreamPartitioner to determine the distribution of records to partitions. |
KStream<K,V> |
through(String topic)
Materialize this stream to a topic and creates a new
KStream from the topic using default serializers and
deserializers and producer's DefaultPartitioner. |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this stream to a topic using a customizable
StreamPartitioner to determine the distribution
of records to partitions. |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this stream to a topic.
|
void |
to(StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this stream to a topic using default serializers specified in the config and a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(String topic)
Materialize this stream to a topic using default serializers specified in the config and producer's
DefaultPartitioner. |
<K1,V1> KStream<K1,V1> |
transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier,
String... stateStoreNames)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<VR> KStream<K,VR> |
transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier,
String... stateStoreNames)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
void |
writeAsText(String filePath)
Write the records of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
Serde<K> keySerde,
Serde<V> valSerde)
Write the records of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
String streamName)
Write the records of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
String streamName,
Serde<K> keySerde,
Serde<V> valSerde)
Write the records of this stream to a file at the given path.
|
KStream<K,V> filter(Predicate<? super K,? super V> predicate)
KStream that consists of all records of this stream which satisfy the given predicate.
All records that do not satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate - a filter Predicate that is applied to each recordKStream that contains only those records that satisfy the given predicatefilterNot(Predicate)KStream<K,V> filterNot(Predicate<? super K,? super V> predicate)
KStream that consists all records of this stream which do not satisfy the given
predicate.
All records that do satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate - a filter Predicate that is applied to each recordKStream that contains only those records that do not satisfy the given predicatefilter(Predicate)<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
KeyValueMapper is applied to each input record and computes a new key for it.
Thus, an input record <K,V> can be transformed into an output record <K':V>.
This is a stateless record-by-record operation.
For example, you can use this transformation to set a key for a key-less input record <null,V> by
extracting a key from the value within your KeyValueMapper. The example below computes the new key as the
length of the value string.
KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
Integer apply(Byte[] key, String value) {
return value.length();
}
});
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream.
KR - the new key type of the result streammapper - a KeyValueMapper that computes a new key for each recordKStream that contains records with new key (possibly of different type) and unmodified valuemap(KeyValueMapper),
flatMap(KeyValueMapper),
mapValues(ValueMapper),
flatMapValues(ValueMapper)<KR,VR> KStream<KR,VR> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
KeyValueMapper is applied to each input record and computes a new output record.
Thus, an input record <K,V> can be transformed into an output record <K':V'>.
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for
stateful record transformation).
The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<Integer, String> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
KeyValue<String, Integer> apply(String key, String value) {
return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
}
});
The provided KeyValueMapper must return a KeyValue type and must not return null.
Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream. (cf. mapValues(ValueMapper))
KR - the key type of the result streamVR - the value type of the result streammapper - a KeyValueMapper that computes a new output recordKStream that contains records with new key and value (possibly both of different type)selectKey(KeyValueMapper),
flatMap(KeyValueMapper),
mapValues(ValueMapper),
flatMapValues(ValueMapper),
transform(TransformerSupplier, String...),
transformValues(ValueTransformerSupplier, String...)<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
ValueMapper is applied to each input record value and computes a new value for it.
Thus, an input record <K,V> can be transformed into an output record <K:V'>.
This is a stateless record-by-record operation (cf.
transformValues(ValueTransformerSupplier, String...) for stateful value transformation).
The example below counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream. (cf. map(KeyValueMapper))
VR - the value type of the result streammapper - a ValueMapper that computes a new output valueKStream that contains records with unmodified key and new values (possibly of different type)selectKey(KeyValueMapper),
map(KeyValueMapper),
flatMap(KeyValueMapper),
flatMapValues(ValueMapper),
transform(TransformerSupplier, String...),
transformValues(ValueTransformerSupplier, String...)<KR,VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper)
KeyValueMapper is applied to each input record and computes zero or more output records.
Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, ....
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...) for
stateful record transformation).
The example below splits input records <null:String> containing sentences as values into their words
and emit a record <word:1> for each word.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<Integer, String> outputStream = inputStream.flatMap(new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
String[] tokens = value.split(" ");
List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
for(String token : tokens) {
result.add(new KeyValue<>(token, 1));
}
return result;
}
});
The provided KeyValueMapper must return an Iterable (e.g., any Collection type)
and the return value must not be null.
Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream. (cf. flatMapValues(ValueMapper))
KR - the key type of the result streamVR - the value type of the result streammapper - a KeyValueMapper that computes the new output recordsKStream that contains more or less records with new key and value (possibly of different type)selectKey(KeyValueMapper),
map(KeyValueMapper),
mapValues(ValueMapper),
flatMapValues(ValueMapper),
transform(TransformerSupplier, String...),
transformValues(ValueTransformerSupplier, String...)<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> processor)
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output
stream (value type can be altered arbitrarily).
The provided ValueMapper is applied to each input record and computes zero or more output values.
Thus, an input record <K,V> can be transformed into output records <K:V'>, <K:V''>, ....
This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...)
for stateful value transformation).
The example below splits input records <null:String> containing sentences as values into their words.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<Integer, String> outputStream = inputStream.flatMap(new ValueMapper<String, Iterable<String>> {
Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
});
The provided ValueMapper must return an Iterable (e.g., any Collection type)
and the return value must not be null.
Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream. (cf. flatMap(KeyValueMapper))
VR - the value type of the result streamprocessor - a ValueMapper the computes the new output valuesKStream that contains more or less records with unmodified keys and new values of different typeselectKey(KeyValueMapper),
map(KeyValueMapper),
flatMap(KeyValueMapper),
mapValues(ValueMapper),
transform(TransformerSupplier, String...),
transformValues(ValueTransformerSupplier, String...)void print()
System.out.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the console.
The default serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
void print(String streamName)
System.out.
This function will use the given name to label the key/value pairs printed to the console.
The default serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
streamName - the name used to label the key/value pairs printed to the consolevoid print(Serde<K> keySerde, Serde<V> valSerde)
System.out.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the console.
The provided serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
keySerde - key serde used to deserialize key if type is byte[],valSerde - value serde used to deserialize value if type is byte[],void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
System.out.
The provided serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
keySerde - key serde used to deserialize key if type is byte[],valSerde - value serde used to deserialize value if type is byte[],streamName - the name used to label the key/value pairs printed to the consolevoid writeAsText(String filePath)
The default serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
filePath - name of the file to write tovoid writeAsText(String filePath, String streamName)
The default serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
filePath - name of the file to write tostreamName - the name used to label the key/value pairs written to the filevoid writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
The provided serde will be used to deserialize the key or value in case the type is byte[] before calling
toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
filePath - name of the file to write tokeySerde - key serde used to deserialize key if type is byte[],valSerde - value serde used to deserialize value if type is byte[],void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde)
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling toString() on the deserialized object.
Implementors will need to override toString() for keys and values that are not of type String,
Integer etc. to get meaningful information.
filePath - name of the file to write tostreamName - the name used to label the key/value pairs written to the filekeySerde - key serde used to deserialize key if type is byte[],valSerde - value serde used deserialize value if type is byte[],void foreach(ForeachAction<? super K,? super V> action)
KStream.
This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)).
Note that this is a terminal operation that returns void.action - an action to perform on each recordprocess(ProcessorSupplier, String...)KStream<K,V>[] branch(Predicate<? super K,? super V>... predicates)
KStream from this stream by branching the records in the original stream based on
the supplied predicates.
Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
The branching happens on first-match: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true, and is assigned to this stream only.
A record will be dropped if none of the predicates evaluate to true.
This is a stateless record-by-record operation.predicates - the ordered list of Predicate instancesKStreamKStream<K,V> through(String topic)
KStream from the topic using default serializers and
deserializers and producer's DefaultPartitioner.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(someTopicName) and KStreamBuilder#stream(someTopicName).
topic - the topic nameKStream that contains the exact same (and potentially repartitioned) records as this KStreamKStream<K,V> through(StreamPartitioner<? super K,? super V> partitioner, String topic)
KStream from the topic using default serializers and
deserializers and a customizable StreamPartitioner to determine the distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(StreamPartitioner, someTopicName) and
KStreamBuilder#stream(someTopicName).
partitioner - the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner will be usedtopic - the topic nameKStream that contains the exact same (and potentially repartitioned) records as this KStreamKStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, String topic)
KStream from the topic.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
If keySerde provides a WindowedSerializer for the key WindowedStreamPartitioner is
used—otherwise producer's DefaultPartitioner is used.
This is equivalent to calling #to(keySerde, valSerde, someTopicName) and
KStreamBuilder#stream(keySerde, valSerde, someTopicName).
keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic nameKStream that contains the exact same (and potentially repartitioned) records as this KStreamKStream<K,V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K,? super V> partitioner, String topic)
KStream from the topic using a customizable
StreamPartitioner to determine the distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(keySerde, valSerde,
StreamPartitioner, someTopicName) and KStreamBuilder#stream(keySerde, valSerde, someTopicName).
keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedpartitioner - the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde provides a WindowedSerializer for the key
WindowedStreamPartitioner will be used—otherwise DefaultPartitioner will
be usedtopic - the topic nameKStream that contains the exact same (and potentially repartitioned) records as this KStreamvoid to(String topic)
DefaultPartitioner.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).topic - the topic namevoid to(StreamPartitioner<? super K,? super V> partitioner, String topic)
StreamPartitioner to determine the distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).partitioner - the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner will be usedtopic - the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, String topic)
keySerde provides a WindowedSerializer
for the key WindowedStreamPartitioner is used—otherwise producer's DefaultPartitioner is
used.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).keySerde - key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedtopic - the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K,? super V> partitioner, String topic)
StreamPartitioner to determine the distribution
of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).keySerde - key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde - value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedpartitioner - the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde provides a WindowedSerializer for the key
WindowedStreamPartitioner will be used—otherwise DefaultPartitioner will
be usedtopic - the topic name<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
Transformer (provided by the given TransformerSupplier) is applied to each input record and
computes zero or more output records.
Thus, an input record <K,V> can be transformed into output records <K':V'>, <K'':V''>, ....
This is a stateful record-by-record operation (cf. flatMap(KeyValueMapper)).
Furthermore, via Transformer.punctuate(long) the processing progress can be observed and additional
periodic actions can be performed.
In order to assign a state, the state must be created and registered beforehand:
// create store
StateStoreSupplier myStore = Stores.create("myTransformState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
// register store
builder.addStore(myStore);
KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
Within the Transformer, the state is obtained via the
ProcessorContext.
To trigger periodic actions via punctuate(), a schedule must be registered.
The Transformer must return a KeyValue type in transform() and punctuate().
new TransformerSupplier() {
Transformer get() {
return new Transformer() {
private ProcessorContext context;
private StateStore state;
void init(ProcessorContext context) {
this.context = context;
this.state = context.getStateStore("myTransformState");
context.schedule(1000); // call #punctuate() each 1000ms
}
KeyValue transform(K key, V value) {
// can access this.state
// can emit as many new KeyValue pairs as required via this.context#forward()
return new KeyValue(key, value); // can emit a single value via return -- can also be null
}
KeyValue punctuate(long timestamp) {
// can access this.state
// can emit as many new KeyValue pairs as required via this.context#forward()
return null; // don't return result -- can also be "new KeyValue()"
}
void close() {
// can access this.state
// can emit as many new KeyValue pairs as required via this.context#forward()
}
}
}
}
Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream.
(cf. transformValues(ValueTransformerSupplier, String...))
K1 - the key type of the new streamV1 - the value type of the new streamtransformerSupplier - a instance of TransformerSupplier that generates a TransformerstateStoreNames - the names of the state stores used by the processorKStream that contains more or less records with new key and value (possibly of different type)flatMap(KeyValueMapper),
transformValues(ValueTransformerSupplier, String...),
process(ProcessorSupplier, String...)<VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, String... stateStoreNames)
ValueTransformer (provided by the given ValueTransformerSupplier) is applies to each input
record value and computes a new value for it.
Thus, an input record <K,V> can be transformed into an output record <K:V'>.
This is a stateful record-by-record operation (cf. mapValues(ValueMapper)).
Furthermore, via ValueTransformer.punctuate(long) the processing progress can be observed and additional
periodic actions get be performed.
In order to assign a state, the state must be created and registered beforehand:
// create store
StateStoreSupplier myStore = Stores.create("myValueTransformState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
// register store
builder.addStore(myStore);
KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
Within the ValueTransformer, the state is obtained via the
ProcessorContext.
To trigger periodic actions via punctuate(), a schedule must be
registered.
In contrast to transform(), no additional KeyValue
pairs should be emitted via ProcessorContext.forward().
new ValueTransformerSupplier() {
ValueTransformer get() {
return new ValueTransformer() {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
context.schedule(1000); // call #punctuate() each 1000ms
}
NewValueType transform(V value) {
// can access this.state
return new NewValueType(); // or null
}
NewValueType punctuate(long timestamp) {
// can access this.state
return null; // don't return result -- can also be "new NewValueType()" (current key will be used to build KeyValue pair)
}
void close() {
// can access this.state
}
}
}
}
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream. (cf. transform(TransformerSupplier, String...))
VR - the value type of the result streamvalueTransformerSupplier - a instance of ValueTransformerSupplier that generates a
ValueTransformerstateStoreNames - the names of the state stores used by the processorKStream that contains records with unmodified key and new values (possibly of different type)mapValues(ValueMapper),
transform(TransformerSupplier, String...)void process(ProcessorSupplier<? super K,? super V> processorSupplier, String... stateStoreNames)
Processor (provided by the given
ProcessorSupplier).
This is a stateful record-by-record operation (cf. foreach(ForeachAction)).
Furthermore, via Processor.punctuate(long) the processing progress can be observed and additional
periodic actions can be performed.
Note that this is a terminal operation that returns void.
In order to assign a state, the state must be created and registered beforehand:
// create store
StateStoreSupplier myStore = Stores.create("myProcessorState")
.withKeys(...)
.withValues(...)
.persistent() // optional
.build();
// register store
builder.addStore(myStore);
inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
Within the Processor, the state is obtained via the
ProcessorContext.
To trigger periodic actions via punctuate(),
a schedule must be registered.
new ProcessorSupplier() {
Processor get() {
return new Processor() {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myProcessorState");
context.schedule(1000); // call #punctuate() each 1000ms
}
void process(K key, V value) {
// can access this.state
}
void punctuate(long timestamp) {
// can access this.state
}
void close() {
// can access this.state
}
}
}
}
processorSupplier - a instance of ProcessorSupplier that generates a ProcessorstateStoreNames - the names of the state store used by the processorforeach(ForeachAction),
transform(TransformerSupplier, String...)KGroupedStream<K,V> groupByKey()
KGroupedStream while preserving the original values
and default serializers and deserializers.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream).
If a record key is null the record will not be included in the resulting KGroupedStream.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper),
map(KeyValueMapper), flatMap(KeyValueMapper), or
transform(TransformerSupplier, String...)), and no data redistribution happened afterwards (e.g., via
through(String)) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned
correctly on its key.
If the last key changing operator changed the key type, it is recommended to use
groupByKey(Serde, Serde) instead.
KGroupedStream that contains the grouped records of the original KStreamgroupBy(KeyValueMapper)KGroupedStream<K,V> groupByKey(Serde<K> keySerde, Serde<V> valSerde)
KGroupedStream while preserving the original values.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream).
If a record key is null the record will not be included in the resulting KGroupedStream.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper),
map(KeyValueMapper), flatMap(KeyValueMapper), or
transform(TransformerSupplier, String...)), and no data redistribution happened afterwards (e.g., via
through(String)) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned
correctly on its key.
keySerde - key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedStream that contains the grouped records of the original KStream<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> selector)
KStream on a new key that is selected using the provided KeyValueMapper
and default serializers and deserializers.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream).
The KeyValueMapper selects a new key (with should be of the same type) while preserving the original values.
If the new record key is null the record will not be included in the resulting KGroupedStream
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
and rereading all records from it, such that the resulting KGroupedStream is partitioned on the new key.
This operation is equivalent to calling selectKey(KeyValueMapper) followed by groupByKey().
If the key type is changed, it is recommended to use groupBy(KeyValueMapper, Serde, Serde) instead.
KR - the key type of the result KGroupedStreamselector - a KeyValueMapper that computes a new key for groupingKGroupedStream that contains the grouped records of the original KStream<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> selector, Serde<KR> keySerde, Serde<V> valSerde)
KStream on a new key that is selected using the provided KeyValueMapper.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream).
The KeyValueMapper selects a new key (with potentially different type) while preserving the original values.
If the new record key is null the record will not be included in the resulting KGroupedStream.
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
and rereading all records from it, such that the resulting KGroupedStream is partitioned on the new key.
This is equivalent to calling selectKey(KeyValueMapper) followed by groupByKey(Serde, Serde).
KR - the key type of the result KGroupedStreamselector - a KeyValueMapper that computes a new key for groupingkeySerde - key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedStream that contains the grouped records of the original KStreamgroupByKey()<VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream's records using windowed inner equi join with default
serializers and deserializers.
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | ||
| <K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
| <K3:c> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "storeName" is an
internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key and within the joining window intervalsleftJoin(KStream, ValueJoiner, JoinWindows),
outerJoin(KStream, ValueJoiner, JoinWindows)<VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<VO> otherValueSerde)
KStream's records using windowed inner equi join.
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | ||
| <K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
| <K3:c> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter APPLICATION_ID_CONFIG,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowskeySerde - key serdes for materializing both streams,
if not specified the default serdes defined in the configs will be usedthisValueSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key and within the joining window intervalsleftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde),
outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)<VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream's records using windowed left equi join with default
serializers and deserializers.
In contrast to inner-join, all records from this stream will
produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided
ValueJoiner will be called with a null value for the other stream.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | |
| <K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
| <K3:c> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter APPLICATION_ID_CONFIG,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of
this KStream and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows),
outerJoin(KStream, ValueJoiner, JoinWindows)<VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValSerde, Serde<VO> otherValueSerde)
KStream's records using windowed left equi join.
In contrast to inner-join, all records from
this stream will produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of this KStream that does not satisfy the join predicate the provided
ValueJoiner will be called with a null value for the other stream.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | |
| <K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
| <K3:c> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "storeName" is an
internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowskeySerde - key serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedthisValSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of
this KStream and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde),
outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)<VO,VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream's records using windowed left equi join with default
serializers and deserializers.
In contrast to inner-join or
left-join, all records from both streams will produce at
least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided
ValueJoiner will be called with a null value for the this/other stream, respectively.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | |
| <K2:B> | <K2:b> | <K2:ValueJoiner(null,b)> <K2:ValueJoiner(B,b)> |
| <K3:c> | <K3:ValueJoiner(null,c)> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter APPLICATION_ID_CONFIG,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of
both KStream and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows),
leftJoin(KStream, ValueJoiner, JoinWindows)<VO,VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Serde<K> keySerde, Serde<V> thisValueSerde, Serde<VO> otherValueSerde)
KStream's records using windowed left equi join.
In contrast to inner-join or
left-join, all records from both
streams will produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of both KStreams that does not satisfy the join predicate the provided
ValueJoiner will be called with a null value for this/other stream, respectively.
If an input record key or value is null the record will not be included in the join operation and thus no
output record will be added to the resulting KStream.
Example (assuming all input records belong to the correct windows):
| this | other | result |
|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | |
| <K2:B> | <K2:b> | <K2:ValueJoiner(null,b)> <K2:ValueJoiner(B,b)> |
| <K3:c> | <K3:ValueJoiner(null,c)> |
through(String) (for one input stream) before doing the
join, using a pre-created topic with the "correct" number of partitions.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen for one or both of the joining KStreams.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
Both of the joining KStreams will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
in StreamsConfig via parameter APPLICATION_ID_CONFIG,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
VO - the value type of the other streamVR - the value type of the result streamotherStream - the KStream to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordswindows - the specification of the JoinWindowskeySerde - key serdes for materializing both streams,
if not specified the default serdes defined in the configs will be usedthisValueSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedotherValueSerde - value serdes for materializing the other stream,
if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of
both KStreams and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde),
leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)<VT,VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
KTable's records using non-windowed inner equi join with default
serializers and deserializers.
The join is a primary key table lookup join with join attribute stream.key == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable state.
In contrast, processing KTable input records will only update the internal KTable state and
will not produce any result records.
For each KStream record that finds a corresponding record in KTable the provided
ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
Example:
| KStream | KTable | state | result |
|---|---|---|---|
| <K1:A> | |||
| <K1:b> | <K1:b> | ||
| <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
through(String) for this KStream before doing
the join, using a pre-created topic with the same number of partitions as the given KTable.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen only for this KStream but not for the provided KTable.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
VT - the value type of the tableVR - the value type of the result streamtable - the KTable to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same keyleftJoin(KTable, ValueJoiner),
join(GlobalKTable, KeyValueMapper, ValueJoiner)<VT,VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Serde<K> keySerde, Serde<V> valSerde)
KTable's records using non-windowed inner equi join.
The join is a primary key table lookup join with join attribute stream.key == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable state.
In contrast, processing KTable input records will only update the internal KTable state and
will not produce any result records.
For each KStream record that finds a corresponding record in KTable the provided
ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
Example:
| KStream | KTable | state | result |
|---|---|---|---|
| <K1:A> | |||
| <K1:b> | <K1:b> | ||
| <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
through(String) for this KStream before doing
the join, using a pre-created topic with the same number of partitions as the given KTable.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen only for this KStream but not for the provided KTable.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
VT - the value type of the tableVR - the value type of the result streamtable - the KTable to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordskeySerde - key serdes for materializing this stream.
If not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given
ValueJoiner, one for each matched record-pair with the same keyleftJoin(KTable, ValueJoiner, Serde, Serde),
join(GlobalKTable, KeyValueMapper, ValueJoiner)<VT,VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
KTable's records using non-windowed left equi join with default
serializers and deserializers.
In contrast to inner-join, all records from this stream will produce an
output record (cf. below).
The join is a primary key table lookup join with join attribute stream.key == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable state.
In contrast, processing KTable input records will only update the internal KTable state and
will not produce any result records.
For each KStream record weather or not it finds a corresponding record in KTable the provided
ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
If no KTable record was found during lookup, a null value will be provided to ValueJoiner.
The key of the result record is the same as for both joining input records.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
Example:
| KStream | KTable | state | result |
|---|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | ||
| <K1:b> | <K1:b> | ||
| <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
through(String) for this KStream before doing
the join, using a pre-created topic with the same number of partitions as the given KTable.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen only for this KStream but not for the provided KTable.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
VT - the value type of the tableVR - the value type of the result streamtable - the KTable to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one output for each input KStream recordjoin(KTable, ValueJoiner),
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)<VT,VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Serde<K> keySerde, Serde<V> valSerde)
KTable's records using non-windowed left equi join.
In contrast to inner-join, all records from this stream will produce an
output record (cf. below).
The join is a primary key table lookup join with join attribute stream.key == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable state.
In contrast, processing KTable input records will only update the internal KTable state and
will not produce any result records.
For each KStream record weather or not it finds a corresponding record in KTable the provided
ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
If no KTable record was found during lookup, a null value will be provided to ValueJoiner.
The key of the result record is the same as for both joining input records.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
Example:
| KStream | KTable | state | result |
|---|---|---|---|
| <K1:A> | <K1:ValueJoiner(A,null)> | ||
| <K1:b> | <K1:b> | ||
| <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
through(String) for this KStream before doing
the join, using a pre-created topic with the same number of partitions as the given KTable.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
user-specified in StreamsConfig via parameter
APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and
"-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString().
Repartitioning can happen only for this KStream but not for the provided KTable.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream is partitioned
correctly on its key.
VT - the value type of the tableVR - the value type of the result streamtable - the KTable to be joined with this streamjoiner - a ValueJoiner that computes the join result for a pair of matching recordskeySerde - key serdes for materializing this stream.
If not specified the default serdes defined in the configs will be usedvalSerde - value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKStream that contains join-records for each key and values computed by the given
ValueJoiner, one output for each input KStream recordjoin(KTable, ValueJoiner, Serde, Serde),
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)<GK,GV,RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalKTable, KeyValueMapper<? super K,? super V,? extends GK> keyValueMapper, ValueJoiner<? super V,? super GV,? extends RV> joiner)
GlobalKTable's records using non-windowed inner equi join.
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream record that finds a corresponding record in GlobalKTable the provided
ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as the key of this KStream.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
GK - the key type of GlobalKTableGV - the value type of the GlobalKTableRV - the value type of the resulting KStreamglobalKTable - the GlobalKTable to be joined with this streamkeyValueMapper - instance of KeyValueMapper used to map from the (key, value) of this stream
to the key of the GlobalKTablejoiner - a ValueJoiner that computes the join result for a pair of matching recordsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one output for each input KStream recordleftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)<GK,GV,RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalKTable, KeyValueMapper<? super K,? super V,? extends GK> keyValueMapper, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
GlobalKTable's records using non-windowed left equi join.
In contrast to inner-join, all records from this stream
will produce an output record (cf. below).
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key.
"Table lookup join" means, that results are only computed if KStream records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream record whether or not it finds a corresponding record in GlobalKTable the
provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record.
If no GlobalKTable record was found during lookup, a null value will be provided to
ValueJoiner.
The key of the result record is the same as this KStream.
If an KStream input record key or value is null the record will not be included in the join
operation and thus no output record will be added to the resulting KStream.
GK - the key type of GlobalKTableGV - the value type of the GlobalKTableRV - the value type of the resulting KStreamglobalKTable - the GlobalKTable to be joined with this streamkeyValueMapper - instance of KeyValueMapper used to map from the (key, value) of this stream
to the key of the GlobalKTablevalueJoiner - a ValueJoiner that computes the join result for a pair of matching recordsKStream that contains join-records for each key and values computed by the given
ValueJoiner, one output for each input KStream recordjoin(GlobalKTable, KeyValueMapper, ValueJoiner)