Interface KStream<K,V>
- Type Parameters:
K- Type of keysV- Type of values
KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an
independent entity/event in the real world.
For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2>
in the stream.
A KStream is either defined from one or multiple Kafka topics that
are consumed message by message or the result of a KStream transformation.
A KTable can also be converted into a KStream.
A KStream can be transformed record by record, joined with another KStream, KTable,
GlobalKTable, or can be aggregated into a KTable.
Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. Topology) via
process(...),
transform(...), and
transformValues(...).
-
Method Summary
Modifier and TypeMethodDescriptionDeprecated.since 2.8.Deprecated.since 2.8.Create a newKStreamthat consists of all records of this stream which satisfy the given predicate.Create a newKStreamthat consists of all records of this stream which satisfy the given predicate.Create a newKStreamthat consists all records of this stream which do not satisfy the given predicate.Create a newKStreamthat consists all records of this stream which do not satisfy the given predicate.<KR,VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).<KR,VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, Named named) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream.flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper, Named named) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream.flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream.flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper, Named named) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream.<K1,V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String... stateStoreNames) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).<K1,V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, Named named, String... stateStoreNames) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily).flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value.flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value.flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value.flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value.voidforeach(ForeachAction<? super K, ? super V> action) Perform an action on each record ofKStream.voidforeach(ForeachAction<? super K, ? super V> action, Named named) Perform an action on each record ofKStream.<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K, ? super V, KR> keySelector) Group the records of thisKStreamon a new key that is selected using the providedKeyValueMapperand default serializers and deserializers.<KR> KGroupedStream<KR,V> Group the records of thisKStreamon a new key that is selected using the providedKeyValueMapperandSerdes as specified byGrouped.Group the records by their current key into aKGroupedStreamwhile preserving the original values and default serializers and deserializers.groupByKey(Grouped<K, V> grouped) Group the records by their current key into aKGroupedStreamwhile preserving the original values and using the serializers as defined byGrouped.join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> joiner) Join records of this stream withGlobalKTable's records using non-windowed inner equi join.join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> joiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed inner equi join.join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner) Join records of this stream withGlobalKTable's records using non-windowed inner equi join.join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed inner equi join.join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed inner equi join with default serializers and deserializers.join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed inner equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed inner equi join with default serializers and deserializers.join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed inner equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.join(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers.join(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers.Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers.join(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers.leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner) Join records of this stream withGlobalKTable's records using non-windowed left equi join.leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed left equi join.leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner) Join records of this stream withGlobalKTable's records using non-windowed left equi join.leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed left equi join.leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed left equi join with default serializers and deserializers.leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed left equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed left equi join with default serializers and deserializers.leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed left equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.leftJoin(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers.leftJoin(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers.leftJoin(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers.leftJoin(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers.<KR,VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).<KR,VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, Named named) Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).mapValues(ValueMapper<? super V, ? extends VR> mapper) Transform the value of each input record into a new value (with possible new type) of the output record.mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named) Transform the value of each input record into a new value (with possible new type) of the output record.mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) Transform the value of each input record into a new value (with possible new type) of the output record.mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) Transform the value of each input record into a new value (with possible new type) of the output record.Merge this stream and the given stream into one larger stream.Merge this stream and the given stream into one larger stream.outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed outer equi join with default serializers and deserializers.outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed outer equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed outer equi join with default serializers and deserializers.outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed outer equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores.peek(ForeachAction<? super K, ? super V> action) Perform an action on each record ofKStream.peek(ForeachAction<? super K, ? super V> action, Named named) Perform an action on each record ofKStream.voidPrint the records of this KStream using the options provided byPrintedNote that this is mainly for debugging/testing purposes, and it will try to flush on each record print.voidprocess(ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier).voidprocess(ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, Named named, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier).voidprocess(ProcessorSupplier<? super K, ? super V> processorSupplier, String... stateStoreNames) Deprecated.Since 3.0.voidprocess(ProcessorSupplier<? super K, ? super V> processorSupplier, Named named, String... stateStoreNames) Deprecated.Since 3.0.Materialize this stream to an auto-generated repartition topic and create a newKStreamfrom the auto-generated topic using default serializers, deserializers, and producer'sDefaultPartitioner.repartition(Repartitioned<K, V> repartitioned) Materialize this stream to an auto-generated repartition topic and create a newKStreamfrom the auto-generated topic usingkey serde,value serde,StreamPartitioner, number of partitions, and topic name part as defined byRepartitioned.selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) Set a new key (with possibly new type) for each input record.selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Named named) Set a new key (with possibly new type) for each input record.split()Split this stream into different branches.Split this stream into different branches.Deprecated.since 2.6; userepartition()insteadDeprecated.since 2.6; userepartition(Repartitioned)insteadvoidMaterialize this stream to a topic using default serializers specified in the config and producer'sDefaultPartitioner.voidMaterialize this stream to a topic using the providedProducedinstance.voidto(TopicNameExtractor<K, V> topicExtractor) Dynamically materialize this stream to topics using default serializers specified in the config and producer'sDefaultPartitioner.voidDynamically materialize this stream to topics using the providedProducedinstance.toTable()Convert this stream to aKTable.toTable(Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Convert this stream to aKTable.Convert this stream to aKTable.toTable(Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Convert this stream to aKTable.<K1,V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily).<K1,V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, Named named, String... stateStoreNames) Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily).transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record.transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record.transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record.transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record.
-
Method Details
-
filter
Create a newKStreamthat consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation.- Parameters:
predicate- a filterPredicatethat is applied to each record- Returns:
- a
KStreamthat contains only those records that satisfy the given predicate - See Also:
-
filter
Create a newKStreamthat consists of all records of this stream which satisfy the given predicate. All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation. -
filterNot
Create a newKStreamthat consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation.- Parameters:
predicate- a filterPredicatethat is applied to each record- Returns:
- a
KStreamthat contains only those records that do not satisfy the given predicate - See Also:
-
filterNot
Create a newKStreamthat consists all records of this stream which do not satisfy the given predicate. All records that do satisfy the predicate are dropped. This is a stateless record-by-record operation. -
selectKey
Set a new key (with possibly new type) for each input record. The providedKeyValueMapperis applied to each input record and computes a new key for it. Thus, an input record<K,V>can be transformed into an output record<K':V>. This is a stateless record-by-record operation.For example, you can use this transformation to set a key for a key-less input record
<null,V>by extracting a key from the value within yourKeyValueMapper. The example below computes the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKStream<Byte[], String> keyLessStream = builder.stream("key-less-topic"); KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> { Integer apply(Byte[] key, String value) { return value.length(); } });KStream.- Type Parameters:
KR- the new key type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new key for each record- Returns:
- a
KStreamthat contains records with new key (possibly of different type) and unmodified value - See Also:
-
selectKey
Set a new key (with possibly new type) for each input record. The providedKeyValueMapperis applied to each input record and computes a new key for it. Thus, an input record<K,V>can be transformed into an output record<K':V>. This is a stateless record-by-record operation.For example, you can use this transformation to set a key for a key-less input record
<null,V>by extracting a key from the value within yourKeyValueMapper. The example below computes the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKStream<Byte[], String> keyLessStream = builder.stream("key-less-topic"); KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> { Integer apply(Byte[] key, String value) { return value.length(); } });KStream.- Type Parameters:
KR- the new key type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new key for each recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains records with new key (possibly of different type) and unmodified value - See Also:
-
map
<KR,VR> KStream<KR,VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily). The providedKeyValueMapperis applied to each input record and computes a new output record. Thus, an input record<K,V>can be transformed into an output record<K':V'>. This is a stateless record-by-record operation (cf.transform(TransformerSupplier, String...)for stateful record transformation).The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
The providedKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> { KeyValue<String, Integer> apply(String key, String value) { return new KeyValue<>(key.toUpperCase(), value.split(" ").length); } });KeyValueMappermust return aKeyValuetype and must not returnnull.Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.mapValues(ValueMapper))- Type Parameters:
KR- the key type of the result streamVR- the value type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new output record- Returns:
- a
KStreamthat contains records with new key and value (possibly both of different type) - See Also:
-
selectKey(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)flatMapValues(ValueMapper)flatMapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)
-
map
<KR,VR> KStream<KR,VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, Named named) Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily). The providedKeyValueMapperis applied to each input record and computes a new output record. Thus, an input record<K,V>can be transformed into an output record<K':V'>. This is a stateless record-by-record operation (cf.transform(TransformerSupplier, String...)for stateful record transformation).The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
The providedKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> { KeyValue<String, Integer> apply(String key, String value) { return new KeyValue<>(key.toUpperCase(), value.split(" ").length); } });KeyValueMappermust return aKeyValuetype and must not returnnull.Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.mapValues(ValueMapper))- Type Parameters:
KR- the key type of the result streamVR- the value type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new output recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains records with new key and value (possibly both of different type) - See Also:
-
selectKey(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)flatMapValues(ValueMapper)flatMapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)
-
mapValues
Transform the value of each input record into a new value (with possible new type) of the output record. The providedValueMapperis applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation (cf.transformValues(ValueTransformerSupplier, String...)for stateful value transformation).The example below counts the number of token of the value string.
Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the resultKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });KStream. (cf.map(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperthat computes a new output value- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
mapValues
Transform the value of each input record into a new value (with possible new type) of the output record. The providedValueMapperis applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation (cf.transformValues(ValueTransformerSupplier, String...)for stateful value transformation).The example below counts the number of token of the value string.
Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the resultKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });KStream. (cf.map(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperthat computes a new output valuenamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
mapValues
Transform the value of each input record into a new value (with possible new type) of the output record. The providedValueMapperWithKeyis applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation (cf.transformValues(ValueTransformerWithKeySupplier, String...)for stateful value transformation).The example below counts the number of tokens of key and value strings.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the resultKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });KStream. (cf.map(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperWithKeythat computes a new output value- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
mapValues
<VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) Transform the value of each input record into a new value (with possible new type) of the output record. The providedValueMapperWithKeyis applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation (cf.transformValues(ValueTransformerWithKeySupplier, String...)for stateful value transformation).The example below counts the number of tokens of key and value strings.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the resultKStream<String, String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });KStream. (cf.map(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperWithKeythat computes a new output valuenamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
flatMap
<KR,VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). The providedKeyValueMapperis applied to each input record and computes zero or more output records. Thus, an input record<K,V>can be transformed into output records<K':V'>, <K'':V''>, .... This is a stateless record-by-record operation (cf.transform(TransformerSupplier, String...)for stateful record transformation).The example below splits input records
<null:String>containing sentences as values into their words and emit a record<word:1>for each word.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.flatMap( new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> { Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) { String[] tokens = value.split(" "); List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length); for(String token : tokens) { result.add(new KeyValue<>(token, 1)); } return result; } });KeyValueMappermust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMapValues(ValueMapper))- Type Parameters:
KR- the key type of the result streamVR- the value type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes the new output records- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)flatMapValues(ValueMapper)flatMapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
flatMap
<KR,VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, Named named) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). The providedKeyValueMapperis applied to each input record and computes zero or more output records. Thus, an input record<K,V>can be transformed into output records<K':V'>, <K'':V''>, .... This is a stateless record-by-record operation (cf.transform(TransformerSupplier, String...)for stateful record transformation).The example below splits input records
<null:String>containing sentences as values into their words and emit a record<word:1>for each word.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<String, Integer> outputStream = inputStream.flatMap( new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> { Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) { String[] tokens = value.split(" "); List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length); for(String token : tokens) { result.add(new KeyValue<>(token, 1)); } return result; } });KeyValueMappermust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMapValues(ValueMapper))- Type Parameters:
KR- the key type of the result streamVR- the value type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes the new output recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)flatMapValues(ValueMapper)flatMapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
flatMapValues
Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The providedValueMapperis applied to each input record and computes zero or more output values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf.transformValues(ValueTransformerSupplier, String...)for stateful value transformation).The example below splits input records
<null:String>containing sentences as values into their words.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> { Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } });ValueMappermust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMap(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperthe computes the new output values- Returns:
- a
KStreamthat contains more or less records with unmodified keys and new values of different type - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
flatMapValues
<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper, Named named) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The providedValueMapperis applied to each input record and computes zero or more output values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf.transformValues(ValueTransformerSupplier, String...)for stateful value transformation).The example below splits input records
<null:String>containing sentences as values into their words.
The providedKStream<byte[], String> inputStream = builder.stream("topic"); KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> { Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } });ValueMappermust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMap(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperthe computes the new output valuesnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains more or less records with unmodified keys and new values of different type - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
flatMapValues
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The providedValueMapperWithKeyis applied to each input record and computes zero or more output values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf.transformValues(ValueTransformerWithKeySupplier, String...)for stateful value transformation).The example below splits input records
<Integer:String>, with key=1, containing sentences as values into their words.
The providedKStream<Integer, String> inputStream = builder.stream("topic"); KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> { Iterable<Integer, String> apply(Integer readOnlyKey, String value) { if(readOnlyKey == 1) { return Arrays.asList(value.split(" ")); } else { return Arrays.asList(value); } } });ValueMapperWithKeymust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMap(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperWithKeythe computes the new output values- Returns:
- a
KStreamthat contains more or less records with unmodified keys and new values of different type - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
flatMapValues
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper, Named named) Create a newKStreamby transforming the value of each record in this stream into zero or more values with the same key in the new stream. Transform the value of each input record into zero or more records with the same (unmodified) key in the output stream (value type can be altered arbitrarily). The providedValueMapperWithKeyis applied to each input record and computes zero or more output values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... This is a stateless record-by-record operation (cf.transformValues(ValueTransformerWithKeySupplier, String...)for stateful value transformation).The example below splits input records
<Integer:String>, with key=1, containing sentences as values into their words.
The providedKStream<Integer, String> inputStream = builder.stream("topic"); KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> { Iterable<Integer, String> apply(Integer readOnlyKey, String value) { if(readOnlyKey == 1) { return Arrays.asList(value.split(" ")); } else { return Arrays.asList(value); } } });ValueMapperWithKeymust return anIterable(e.g., anyCollectiontype) and the return value must not benull.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatMap(KeyValueMapper))- Type Parameters:
VR- the value type of the result stream- Parameters:
mapper- aValueMapperWithKeythe computes the new output valuesnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains more or less records with unmodified keys and new values of different type - See Also:
-
selectKey(KeyValueMapper)map(KeyValueMapper)flatMap(KeyValueMapper)mapValues(ValueMapper)mapValues(ValueMapperWithKey)transform(TransformerSupplier, String...)flatTransform(TransformerSupplier, String...)transformValues(ValueTransformerSupplier, String...)transformValues(ValueTransformerWithKeySupplier, String...)flatTransformValues(ValueTransformerSupplier, String...)flatTransformValues(ValueTransformerWithKeySupplier, String...)
-
print
Print the records of this KStream using the options provided byPrintedNote that this is mainly for debugging/testing purposes, and it will try to flush on each record print. It SHOULD NOT be used for production usage if performance requirements are concerned.- Parameters:
printed- options for printing
-
foreach
Perform an action on each record ofKStream. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)). Note that this is a terminal operation that returns void.- Parameters:
action- an action to perform on each record- See Also:
-
foreach
Perform an action on each record ofKStream. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)). Note that this is a terminal operation that returns void.- Parameters:
action- an action to perform on each recordnamed- aNamedconfig used to name the processor in the topology- See Also:
-
peek
Perform an action on each record ofKStream. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)).Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
- Parameters:
action- an action to perform on each record- Returns:
- itself
- See Also:
-
peek
Perform an action on each record ofKStream. This is a stateless record-by-record operation (cf.process(ProcessorSupplier, String...)).Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
- Parameters:
action- an action to perform on each recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- itself
- See Also:
-
branch
Deprecated.since 2.8. Usesplit()instead.Creates an array ofKStreamfrom this stream by branching the records in the original stream based on the supplied predicates. Each record is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and is assigned to this stream only. A record will be dropped if none of the predicates evaluate to true. This is a stateless record-by-record operation.- Parameters:
predicates- the ordered list ofPredicateinstances- Returns:
- multiple distinct substreams of this
KStream
-
branch
Deprecated.since 2.8. Usesplit(Named)instead.Creates an array ofKStreamfrom this stream by branching the records in the original stream based on the supplied predicates. Each record is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and is assigned to this stream only. A record will be dropped if none of the predicates evaluate to true. This is a stateless record-by-record operation. -
split
BranchedKStream<K,V> split()Split this stream into different branches. The returnedBranchedKStreaminstance can be used for routing the records to different branches depending on evaluation against the supplied predicates.Note: Stream branching is a stateless record-by-record operation. Please check
BranchedKStreamfor detailed description and usage example- Returns:
BranchedKStreamthat provides methods for routing the records to different branches.
-
split
Split this stream into different branches. The returnedBranchedKStreaminstance can be used for routing the records to different branches depending on evaluation against the supplied predicates.Note: Stream branching is a stateless record-by-record operation. Please check
BranchedKStreamfor detailed description and usage example- Parameters:
named- aNamedconfig used to name the processor in the topology and also to set the name prefix for the resulting branches (seeBranchedKStream)- Returns:
BranchedKStreamthat provides methods for routing the records to different branches.
-
merge
Merge this stream and the given stream into one larger stream.There is no ordering guarantee between records from this
KStreamand records from the providedKStreamin the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).- Parameters:
stream- a stream which is to be merged into this stream- Returns:
- a merged stream containing all records from this and the provided
KStream
-
merge
Merge this stream and the given stream into one larger stream.There is no ordering guarantee between records from this
KStreamand records from the providedKStreamin the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).- Parameters:
stream- a stream which is to be merged into this streamnamed- aNamedconfig used to name the processor in the topology- Returns:
- a merged stream containing all records from this and the provided
KStream
-
through
Deprecated.since 2.6; userepartition()insteadMaterialize this stream to a topic and creates a newKStreamfrom the topic using default serializers, deserializers, and producer'sDefaultPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).This is similar to calling
#to(someTopicName)andStreamsBuilder#stream(someTopicName). Note thatthrough()uses a hard codedtimestamp extractorand does not allow to customize it, to ensure correct timestamp propagation.- Parameters:
topic- the topic name- Returns:
- a
KStreamthat contains the exact same (and potentially repartitioned) records as thisKStream
-
through
Deprecated.since 2.6; userepartition(Repartitioned)insteadMaterialize this stream to a topic and creates a newKStreamfrom the topic using theProducedinstance for configuration of thekey serde,value serde, andStreamPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).This is similar to calling
to(someTopic, Produced.with(keySerde, valueSerde)andStreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)). Note thatthrough()uses a hard codedtimestamp extractorand does not allow to customize it, to ensure correct timestamp propagation.- Parameters:
topic- the topic nameproduced- the options to use when producing to the topic- Returns:
- a
KStreamthat contains the exact same (and potentially repartitioned) records as thisKStream
-
repartition
Materialize this stream to an auto-generated repartition topic and create a newKStreamfrom the auto-generated topic using default serializers, deserializers, and producer'sDefaultPartitioner. The number of partitions is determined based on the upstream topics partition numbers.The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.- Returns:
KStreamthat contains the exact same repartitioned records as thisKStream.
-
repartition
Materialize this stream to an auto-generated repartition topic and create a newKStreamfrom the auto-generated topic usingkey serde,value serde,StreamPartitioner, number of partitions, and topic name part as defined byRepartitioned.The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is either provided viaRepartitioned.as(String)or an internally generated name, and "-repartition" is a fixed suffix.- Parameters:
repartitioned- theRepartitionedinstance used to specifySerdes,StreamPartitionerwhich determines how records are distributed among partitions of the topic, part of the topic name, and number of partitions for a repartition topic.- Returns:
- a
KStreamthat contains the exact same repartitioned records as thisKStream.
-
to
Materialize this stream to a topic using default serializers specified in the config and producer'sDefaultPartitioner. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).- Parameters:
topic- the topic name
-
to
Materialize this stream to a topic using the providedProducedinstance. The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is started).- Parameters:
topic- the topic nameproduced- the options to use when producing to the topic
-
to
Dynamically materialize this stream to topics using default serializers specified in the config and producer'sDefaultPartitioner. The topic names for each record to send to is dynamically determined based on theTopicNameExtractor.- Parameters:
topicExtractor- the extractor to determine the name of the Kafka topic to write to for each record
-
to
Dynamically materialize this stream to topics using the providedProducedinstance. The topic names for each record to send to is dynamically determined based on theTopicNameExtractor.- Parameters:
topicExtractor- the extractor to determine the name of the Kafka topic to write to for each recordproduced- the options to use when producing to the topic
-
toTable
Convert this stream to aKTable.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KTableis partitioned correctly on its key. Note that you cannot enableStreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIGconfig for this case, because repartition topics are considered transient and don't allow to recover the resultKTablein cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf.
KStreamvsKTable).- Returns:
- a
KTablethat contains the same records as thisKStream
-
toTable
Convert this stream to aKTable.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KTableis partitioned correctly on its key. Note that you cannot enableStreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIGconfig for this case, because repartition topics are considered transient and don't allow to recover the resultKTablein cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf.
KStreamvsKTable). -
toTable
KTable<K,V> toTable(Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Convert this stream to aKTable.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KTableis partitioned correctly on its key. Note that you cannot enableStreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIGconfig for this case, because repartition topics are considered transient and don't allow to recover the resultKTablein cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf.
KStreamvsKTable).- Parameters:
materialized- an instance ofMaterializedused to describe how the state store of the resulting table should be materialized.- Returns:
- a
KTablethat contains the same records as thisKStream
-
toTable
KTable<K,V> toTable(Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Convert this stream to aKTable.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KTableis partitioned correctly on its key. Note that you cannot enableStreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIGconfig for this case, because repartition topics are considered transient and don't allow to recover the resultKTablein cause of a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of it was a "fact/event" and is re-interpreted as update now (cf.
KStreamvsKTable).- Parameters:
named- aNamedconfig used to name the processor in the topologymaterialized- an instance ofMaterializedused to describe how the state store of the resulting table should be materialized.- Returns:
- a
KTablethat contains the same records as thisKStream
-
groupBy
Group the records of thisKStreamon a new key that is selected using the providedKeyValueMapperand default serializers and deserializers.KGroupedStreamcan be further grouped with other streams to form aCogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf.KGroupedStream). TheKeyValueMapperselects a new key (which may or may not be of the same type) while preserving the original values. If the new record key isnullthe record will not be included in the resultingKGroupedStreamBecause a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().All data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KGroupedStreamis partitioned on the new key.This operation is equivalent to calling
selectKey(KeyValueMapper)followed bygroupByKey(). If the key type is changed, it is recommended to usegroupBy(KeyValueMapper, Grouped)instead.- Type Parameters:
KR- the key type of the resultKGroupedStream- Parameters:
keySelector- aKeyValueMapperthat computes a new key for grouping- Returns:
- a
KGroupedStreamthat contains the grouped records of the originalKStream
-
groupBy
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K, ? super V, KR> keySelector, Grouped<KR, V> grouped) Group the records of thisKStreamon a new key that is selected using the providedKeyValueMapperandSerdes as specified byGrouped.KGroupedStreamcan be further grouped with other streams to form aCogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf.KGroupedStream). TheKeyValueMapperselects a new key (which may or may not be of the same type) while preserving the original values. If the new record key isnullthe record will not be included in the resultingKGroupedStream.Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is either provided viaGrouped.as(String)or an internally generated name.You can retrieve all generated internal topic names via
Topology.describe().All data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KGroupedStreamis partitioned on the new key.This operation is equivalent to calling
selectKey(KeyValueMapper)followed bygroupByKey().- Type Parameters:
KR- the key type of the resultKGroupedStream- Parameters:
keySelector- aKeyValueMapperthat computes a new key for groupinggrouped- theGroupedinstance used to specifySerdesand part of the name for a repartition topic if repartitioning is required.- Returns:
- a
KGroupedStreamthat contains the grouped records of the originalKStream
-
groupByKey
KGroupedStream<K,V> groupByKey()Group the records by their current key into aKGroupedStreamwhile preserving the original values and default serializers and deserializers.KGroupedStreamcan be further grouped with other streams to form aCogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf.KGroupedStream). If a record key isnullthe record will not be included in the resultingKGroupedStream.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KGroupedStreamis partitioned correctly on its key. If the last key changing operator changed the key type, it is recommended to usegroupByKey(org.apache.kafka.streams.kstream.Grouped)instead.- Returns:
- a
KGroupedStreamthat contains the grouped records of the originalKStream - See Also:
-
groupByKey
Group the records by their current key into aKGroupedStreamwhile preserving the original values and using the serializers as defined byGrouped.KGroupedStreamcan be further grouped with other streams to form aCogroupedKStream. Grouping a stream on the record key is required before an aggregation operator can be applied to the data (cf.KGroupedStream). If a record key isnullthe record will not be included in the resultingKGroupedStream.If a key changing operator was used before this operation (e.g.,
selectKey(KeyValueMapper),map(KeyValueMapper),flatMap(KeyValueMapper)ortransform(TransformerSupplier, String...)) an internal repartitioning topic may need to be created in Kafka if a later operator depends on the newly selected key. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, <name> is either provided viaGrouped.as(String)or an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting
KGroupedStreamis partitioned correctly on its key.- Parameters:
grouped- theGroupedinstance used to specifySerdesand part of the name for a repartition topic if repartitioning is required.- Returns:
- a
KGroupedStreamthat contains the grouped records of the originalKStream - See Also:
-
join
<VO,VR> KStream<K,VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed inner equi join with default serializers and deserializers. The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key and within the joining window intervals - See Also:
-
join
<VO,VR> KStream<K,VR> join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed inner equi join with default serializers and deserializers. The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K1,B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key and within the joining window intervals - See Also:
-
join
<VO,VR> KStream<K,VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed inner equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedused to configure join stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key and within the joining window intervals - See Also:
-
join
<VO,VR> KStream<K,VR> join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed inner equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K1,B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedused to configure join stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key and within the joining window intervals - See Also:
-
leftJoin
<VO,VR> KStream<K,VR> leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of thisKStreamthat does not satisfy the join predicate the providedValueJoinerwill be called with anullvalue for the other stream. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of thisKStreamand within the joining window intervals - See Also:
-
leftJoin
<VO,VR> KStream<K,VR> leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of thisKStreamthat does not satisfy the join predicate the providedValueJoinerWithKeywill be called with anullvalue for the other stream. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoinerWithKey(K1, A,null)> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2, B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of thisKStreamand within the joining window intervals - See Also:
-
leftJoin
<VO,VR> KStream<K,VR> leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed left equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. In contrast toinner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of thisKStreamthat does not satisfy the join predicate the providedValueJoinerwill be called with anullvalue for the other stream. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedinstance to configure serdes and state stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of thisKStreamand within the joining window intervals - See Also:
-
leftJoin
<VO,VR> KStream<K,VR> leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed left equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. In contrast toinner-join, all records from this stream will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of thisKStreamthat does not satisfy the join predicate the providedValueJoinerWithKeywill be called with anullvalue for the other stream. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoinerWithKey(K1,A,null)> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,B,b)> <K3:c> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedinstance to configure serdes and state stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of thisKStreamand within the joining window intervals - See Also:
-
outerJoin
<VO,VR> KStream<K,VR> outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed outer equi join with default serializers and deserializers. In contrast toinner-joinorleft-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of bothKStreams that does not satisfy the join predicate the providedValueJoinerwill be called with anullvalue for the this/other stream, respectively. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(null,b)>
<K2:ValueJoiner(B,b)><K3:c> <K3:ValueJoiner(null,c)> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKStreamand within the joining window intervals - See Also:
-
outerJoin
<VO,VR> KStream<K,VR> outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) Join records of this stream with anotherKStream's records using windowed outer equi join with default serializers and deserializers. In contrast toinner-joinorleft-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of bothKStreams that does not satisfy the join predicate the providedValueJoinerWithKeywill be called with anullvalue for the this/other stream, respectively. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoinerWithKey(K1,A,null)> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,null,b)>
<K2:ValueJoinerWithKey(K2,B,b)><K3:c> <K3:ValueJoinerWithKey(K3,null,c)> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindows- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of bothKStreamand within the joining window intervals - See Also:
-
outerJoin
<VO,VR> KStream<K,VR> outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed outer equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. In contrast toinner-joinorleft-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Furthermore, for each input record of bothKStreams that does not satisfy the join predicate the providedValueJoinerwill be called with anullvalue for this/other stream, respectively. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoiner(A,null)> <K2:B> <K2:b> <K2:ValueJoiner(null,b)>
<K2:ValueJoiner(B,b)><K3:c> <K3:ValueJoiner(null,c)> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedinstance to configure serdes and state stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKStreamand within the joining window intervals - See Also:
-
outerJoin
<VO,VR> KStream<K,VR> outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) Join records of this stream with anotherKStream's records using windowed outer equi join using theStreamJoinedinstance for configuration of thekey serde,this stream's value serde,the other stream's value serde, and used state stores. In contrast toinner-joinorleft-join, all records from both streams will produce at least one output record (cf. below). The join is computed on the records' key with join attributethisKStream.key == otherKStream.key. Furthermore, two records are only joined if their timestamps are close to each other as defined by the givenJoinWindows, i.e., the window defines an additional join predicate on the record timestamps.For each pair of records meeting both join predicates the provided
ValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. Furthermore, for each input record of bothKStreams that does not satisfy the join predicate the providedValueJoinerWithKeywill be called with anullvalue for this/other stream, respectively. If an input record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example (assuming all input records belong to the correct windows):
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callthis other result <K1:A> <K1:ValueJoinerWithKey(K1,A,null)> <K2:B> <K2:b> <K2:ValueJoinerWithKey(K2,null,b)>
<K2:ValueJoinerWithKey(K2,B,b)><K3:c> <K3:ValueJoinerWithKey(K3,null,c)> repartition(Repartitioned)(for one input stream) before doing the join and specify the "correct" number of partitions viaRepartitionedparameter. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.Repartitioning can happen for one or both of the joining
KStreams. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.Both of the joining
KStreams will be materialized in local state stores with auto-generated store names, unless a name is provided via aMaterializedinstance. For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "storeName" is an internally generated name, and "-changelog" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().- Type Parameters:
VO- the value type of the other streamVR- the value type of the result stream- Parameters:
otherStream- theKStreamto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordswindows- the specification of theJoinWindowsstreamJoined- aStreamJoinedinstance to configure serdes and state stores- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key plus one for each non-matching record of bothKStreamand within the joining window intervals - See Also:
-
join
<VT,VR> KStream<K,VR> join(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
-
join
<VT,VR> KStream<K,VR> join(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. The key of the result record is the same as for both joining input records. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key - See Also:
-
join
<VT,VR> KStream<K,VR> join(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordsjoined- aJoinedinstance that defines the serdes to be used to serialize/deserialize inputs of the joined streams- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
-
join
<VT,VR> KStream<K,VR> join(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed inner equi join with default serializers and deserializers. The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordsjoined- aJoinedinstance that defines the serdes to be used to serialize/deserialize inputs of the joined streams- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one for each matched record-pair with the same key - See Also:
-
leftJoin
<VT,VR> KStream<K,VR> leftJoin(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. If noKTablerecord was found during lookup, anullvalue will be provided toValueJoiner. The key of the result record is the same as for both joining input records. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:ValueJoiner(A,null)> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
leftJoin
<VT,VR> KStream<K,VR> leftJoin(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. If noKTablerecord was found during lookup, anullvalue will be provided toValueJoinerWithKey. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:ValueJoinerWithKey(K1,A,null)> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
leftJoin
<VT,VR> KStream<K,VR> leftJoin(KTable<K, VT> table, ValueJoiner<? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. If noKTablerecord was found during lookup, anullvalue will be provided toValueJoiner. The key of the result record is the same as for both joining input records. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:ValueJoiner(A,null)> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoiner). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerthat computes the join result for a pair of matching recordsjoined- aJoinedinstance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
leftJoin
<VT,VR> KStream<K,VR> leftJoin(KTable<K, VT> table, ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner, Joined<K, V, VT> joined) Join records of this stream withKTable's records using non-windowed left equi join with default serializers and deserializers. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributestream.key == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current (i.e., processing time) internalKTablestate. In contrast, processingKTableinput records will only update the internalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. If noKTablerecord was found during lookup, anullvalue will be provided toValueJoinerWithKey. The key of the result record is the same as for both joining input records. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If anKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions. If this is not the case, you would need to callKStream KTable state result <K1:A> <K1:ValueJoinerWithKey(K1,A,null)> <K1:b> <K1:b> <K1:C> <K1:b> <K1:ValueJoinerWithKey(K1,C,b)> repartition(Repartitioned)for thisKStreambefore doing the join, specifying the same number of partitions viaRepartitionedparameter as the givenKTable. Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); cf.join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey). If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified inStreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix.You can retrieve all generated internal topic names via
Topology.describe().Repartitioning can happen only for this
KStreambut not for the providedKTable. For this case, all data of the stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the join inputKStreamis partitioned correctly on its key.- Type Parameters:
VT- the value type of the tableVR- the value type of the result stream- Parameters:
table- theKTableto be joined with this streamjoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordsjoined- aJoinedinstance that defines the serdes to be used to serialize/deserialize inputs and outputs of the joined streams- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
join
<GK,GV, KStream<K,RV> RV> join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> joiner) Join records of this stream withGlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inGlobalKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of thisKStream. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, no output record will be added to the resultingKStream.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablejoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
join
<GK,GV, KStream<K,RV> RV> join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner) Join records of this stream withGlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inGlobalKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of thisKStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, no output record will be added to the resultingKStream.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablejoiner- aValueJoinerWithKeythat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
join
<GK,GV, KStream<K,RV> RV> join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> joiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inGlobalKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of thisKStream. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, no output record will be added to the resultingKStream.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
join
<GK,GV, KStream<K,RV> RV> join(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed inner equi join. The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord that finds a corresponding record inGlobalKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as the key of thisKStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, no output record will be added to the resultingKStream.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablejoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
leftJoin
<GK,GV, KStream<K,RV> RV> leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner) Join records of this stream withGlobalKTable's records using non-windowed left equi join. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inGlobalKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as thisKStream. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, anullvalue will be provided toValueJoiner. If noGlobalKTablerecord was found during lookup, anullvalue will be provided toValueJoiner.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablevalueJoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
leftJoin
<GK,GV, KStream<K,RV> RV> leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner) Join records of this stream withGlobalKTable's records using non-windowed left equi join. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inGlobalKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as thisKStream. Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, anullvalue will be provided toValueJoinerWithKey. If noGlobalKTablerecord was found during lookup, anullvalue will be provided toValueJoiner.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablevalueJoiner- aValueJoinerWithKeythat computes the join result for a pair of matching records- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
leftJoin
<GK,GV, KStream<K,RV> RV> leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed left equi join. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inGlobalKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as thisKStream. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, anullvalue will be provided toValueJoiner. If noGlobalKTablerecord was found during lookup, anullvalue will be provided toValueJoiner.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablevalueJoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoiner, one output for each inputKStreamrecord - See Also:
-
leftJoin
<GK,GV, KStream<K,RV> RV> leftJoin(GlobalKTable<GK, GV> globalTable, KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner, Named named) Join records of this stream withGlobalKTable's records using non-windowed left equi join. In contrast toinner-join, all records from this stream will produce an output record (cf. below). The join is a primary key table lookup join with join attributekeyValueMapper.map(stream.keyValue) == table.key. "Table lookup join" means, that results are only computed ifKStreamrecords are processed. This is done by performing a lookup for matching records in the current internalGlobalKTablestate. In contrast, processingGlobalKTableinput records will only update the internalGlobalKTablestate and will not produce any result records.For each
KStreamrecord whether or not it finds a corresponding record inGlobalKTablethe providedValueJoinerWithKeywill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as thisKStream. If aKStreaminput record key or value isnullthe record will not be included in the join operation and thus no output record will be added to the resultingKStream. IfkeyValueMapperreturnsnullimplying no match exists, anullvalue will be provided toValueJoinerWithKey. If noGlobalKTablerecord was found during lookup, anullvalue will be provided toValueJoinerWithKey.- Type Parameters:
GK- the key type ofGlobalKTableGV- the value type of theGlobalKTableRV- the value type of the resultingKStream- Parameters:
globalTable- theGlobalKTableto be joined with this streamkeySelector- instance ofKeyValueMapperused to map from the (key, value) of this stream to the key of theGlobalKTablevalueJoiner- aValueJoinerWithKeythat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains join-records for each key and values computed by the givenValueJoinerWithKey, one output for each inputKStreamrecord - See Also:
-
transform
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). ATransformer(provided by the givenTransformerSupplier) is applied to each input record and returns zero or one output record. Thus, an input record<K,V>can be transformed into an output record<K':V'>. Attaching a state store makes this a stateful record-by-record operation (cf.map()). If you choose not to attach one, this operation is similar to the statelessmap()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator#punctuate(), the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");TransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyTransformerSupplier implements TransformerSupplier { // supply transformer Transformer get() { return new MyTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transform(new MyTransformerSupplier());With either strategy, within the
Transformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheTransformermust return aKeyValuetype intransform(). The return value ofTransformer#transform()may benull, in which case no record is emitted.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyTransformer implements Transformer { private ProcessorContext context; private StateStore state; void init(ProcessorContext context) { this.context = context; this.state = context.getStateStore("myTransformState"); // punctuate each second; can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } KeyValue transform(K key, V value) { // can access this.state return new KeyValue(key, value); // can emit a single value via return -- can also be null } void close() { // can access this.state } }repartition()should be performed beforetransform().Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transformValues())Note that it is possible to emit multiple records for each input record by using
context#forward()inTransformer#transform()andPunctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time,context#forward()should not be used inTransformer#transform()andPunctuator#punctuate(). If inTransformer#transform()multiple records need to be emitted for each input record, it is recommended to useflatTransform(). The supplier should always generate a new instance each timeTransformerSupplier.get()gets called. Creating a singleTransformerobject and returning the same object reference inTransformerSupplier.get()would be a violation of the supplier pattern and leads to runtime exceptions.- Type Parameters:
K1- the key type of the new streamV1- the value type of the new stream- Parameters:
transformerSupplier- an instance ofTransformerSupplierthat generates a newly constructedTransformerstateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
transform
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, Named named, String... stateStoreNames) Transform each record of the input stream into zero or one record in the output stream (both key and value type can be altered arbitrarily). ATransformer(provided by the givenTransformerSupplier) is applied to each input record and returns zero or one output record. Thus, an input record<K,V>can be transformed into an output record<K':V'>. Attaching a state store makes this a stateful record-by-record operation (cf.map()). If you choose not to attach one, this operation is similar to the statelessmap()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator#punctuate(), the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");TransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyTransformerSupplier implements TransformerSupplier { // supply transformer Transformer get() { return new MyTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transform(new MyTransformerSupplier());With either strategy, within the
Transformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheTransformermust return aKeyValuetype intransform(). The return value ofTransformer#transform()may benull, in which case no record is emitted.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyTransformer implements Transformer { private ProcessorContext context; private StateStore state; void init(ProcessorContext context) { this.context = context; this.state = context.getStateStore("myTransformState"); // punctuate each second; can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } KeyValue transform(K key, V value) { // can access this.state return new KeyValue(key, value); // can emit a single value via return -- can also be null } void close() { // can access this.state } }repartition()should be performed beforetransform().Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transformValues())Note that it is possible to emit multiple records for each input record by using
context#forward()inTransformer#transform()andPunctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time,context#forward()should not be used inTransformer#transform()andPunctuator#punctuate(). If inTransformer#transform()multiple records need to be emitted for each input record, it is recommended to useflatTransform(). The supplier should always generate a new instance each timeTransformerSupplier.get()gets called. Creating a singleTransformerobject and returning the same object reference inTransformerSupplier.get()would be a violation of the supplier pattern and leads to runtime exceptions.- Type Parameters:
K1- the key type of the new streamV1- the value type of the new stream- Parameters:
transformerSupplier- an instance ofTransformerSupplierthat generates a newly constructedTransformernamed- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
flatTransform
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String... stateStoreNames) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). ATransformer(provided by the givenTransformerSupplier) is applied to each input record and returns zero or more output records. Thus, an input record<K,V>can be transformed into output records<K':V'>, <K'':V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.flatMap()). If you choose not to attach one, this operation is similar to the statelessflatMap()but allows access to theProcessorContextand record metadata. Furthermore, viaPunctuator#punctuate()the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");TransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyTransformerSupplier implements TransformerSupplier { // supply transformer Transformer get() { return new MyTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());With either strategy, within the
Transformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheTransformermust return anIterabletype (e.g., anyCollectiontype) intransform(). The return value ofTransformer#transform()may benull, which is equal to returning an emptyIterable, i.e., no records are emitted.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyTransformer implements Transformer { private ProcessorContext context; private StateStore state; void init(ProcessorContext context) { this.context = context; this.state = context.getStateStore("myTransformState"); // punctuate each second; can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<KeyValue> transform(K key, V value) { // can access this.state List<KeyValue> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new KeyValue(key, value)); } return result; // emits a list of key-value pairs via return } void close() { // can access this.state } }repartition()should be performed beforeflatTransform().Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transformValues())Note that it is possible to emit records by using
context#forward()inTransformer#transform()andPunctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time,context#forward()should not be used inTransformer#transform()andPunctuator#punctuate(). The supplier should always generate a new instance each timeTransformerSupplier.get()gets called. Creating a singleTransformerobject and returning the same object reference inTransformerSupplier.get()would be a violation of the supplier pattern and leads to runtime exceptions.- Type Parameters:
K1- the key type of the new streamV1- the value type of the new stream- Parameters:
transformerSupplier- an instance ofTransformerSupplierthat generates a newly constructedTransformerstateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
flatTransform
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, Named named, String... stateStoreNames) Transform each record of the input stream into zero or more records in the output stream (both key and value type can be altered arbitrarily). ATransformer(provided by the givenTransformerSupplier) is applied to each input record and returns zero or more output records. Thus, an input record<K,V>can be transformed into output records<K':V'>, <K'':V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.flatMap()). If you choose not to attach one, this operation is similar to the statelessflatMap()but allows access to theProcessorContextand record metadata. Furthermore, viaPunctuator#punctuate()the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transform(new TransformerSupplier() { public Transformer get() { return new MyTransformer(); } }, "myTransformState");TransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyTransformerSupplier implements TransformerSupplier { // supply transformer Transformer get() { return new MyTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());With either strategy, within the
Transformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheTransformermust return anIterabletype (e.g., anyCollectiontype) intransform(). The return value ofTransformer#transform()may benull, which is equal to returning an emptyIterable, i.e., no records are emitted.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyTransformer implements Transformer { private ProcessorContext context; private StateStore state; void init(ProcessorContext context) { this.context = context; this.state = context.getStateStore("myTransformState"); // punctuate each second; can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<KeyValue> transform(K key, V value) { // can access this.state List<KeyValue> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new KeyValue(key, value)); } return result; // emits a list of key-value pairs via return } void close() { // can access this.state } }repartition()should be performed beforeflatTransform().Transforming records might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transformValues())Note that it is possible to emit records by using
context#forward()inTransformer#transform()andPunctuator#punctuate(). Be aware that a mismatch between the types of the emitted records and the type of the stream would only be detected at runtime. To ensure type-safety at compile-time,context#forward()should not be used inTransformer#transform()andPunctuator#punctuate(). The supplier should always generate a new instance each timeTransformerSupplier.get()gets called. Creating a singleTransformerobject and returning the same object reference inTransformerSupplier.get()would be a violation of the supplier pattern and leads to runtime exceptions.- Type Parameters:
K1- the key type of the new streamV1- the value type of the new stream- Parameters:
transformerSupplier- an instance ofTransformerSupplierthat generates a newly constructedTransformernamed- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with new key and value (possibly of different type) - See Also:
-
transformValues
<VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record. AValueTransformer(provided by the givenValueTransformerSupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { public ValueTransformer get() { return new MyValueTransformer(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerSupplier implements ValueTransformerSupplier { // supply transformer ValueTransformer get() { return new MyValueTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());With either strategy, within the
ValueTransformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformermust return the new value intransform(). In contrast totransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformertries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformer implements ValueTransformer { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } NewValueType transform(V value) { // can access this.state return new NewValueType(); // or null } void close() { // can access this.state } }repartition()should be performed beforetransformValues().Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transform(TransformerSupplier, String...))- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerSupplierthat generates a newly constructedValueTransformerThe supplier should always generate a new instance. Creating a singleValueTransformerobject and returning the same object reference inValueTransformeris a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record. AValueTransformer(provided by the givenValueTransformerSupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { public ValueTransformer get() { return new MyValueTransformer(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerSupplier implements ValueTransformerSupplier { // supply transformer ValueTransformer get() { return new MyValueTransformer(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());With either strategy, within the
ValueTransformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformermust return the new value intransform(). In contrast totransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformertries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformer implements ValueTransformer { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } NewValueType transform(V value) { // can access this.state return new NewValueType(); // or null } void close() { // can access this.state } }repartition()should be performed beforetransformValues().Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transform(TransformerSupplier, String...))- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerSupplierthat generates a newly constructedValueTransformerThe supplier should always generate a new instance. Creating a singleValueTransformerobject and returning the same object reference inValueTransformeris a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { public ValueTransformer get() { return new MyValueTransformer(); } }, "myValueTransformState");ValueTransformerWithKeySupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());With either strategy, within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformerWithKeymust return the new value intransform(). In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformerWithKeytries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformerWithKey implements ValueTransformerWithKey { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } }repartition()should be performed beforetransformValues().Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transform(TransformerSupplier, String...))- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerWithKeySupplierthat generates a newly constructedValueTransformerWithKeyThe supplier should always generate a new instance. Creating a singleValueTransformerWithKeyobject and returning the same object reference inValueTransformerWithKeyis a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into a new value (with possibly a new type) of the output record. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { public ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } }, "myValueTransformState");ValueTransformerWithKeySupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());With either strategy, within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformerWithKeymust return the new value intransform(). In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformerWithKeytries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformerWithKey implements ValueTransformerWithKey { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } }repartition()should be performed beforetransformValues().Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.transform(TransformerSupplier, String...))- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerWithKeySupplierthat generates a newly constructedValueTransformerWithKeyThe supplier should always generate a new instance. Creating a singleValueTransformerWithKeyobject and returning the same object reference inValueTransformerWithKeyis a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains records with unmodified key and new values (possibly of different type) - See Also:
-
flatTransformValues
<VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. AValueTransformer(provided by the givenValueTransformerSupplier) is applied to each input record value and computes zero or more new values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator#punctuate()the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { public ValueTransformer get() { return new MyValueTransformer(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerSupplier implements ValueTransformerSupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());With either strategy, within the
ValueTransformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformermust return anIterabletype (e.g., anyCollectiontype) intransform(). If the return value ofValueTransformer#transform()is an emptyIterableornull, no records are emitted. In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformertries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformer implements ValueTransformer { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<NewValueType> transform(V value) { // can access this.state List<NewValueType> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new NewValueType(value)); } return result; // values } void close() { // can access this.state } }repartition()should be performed beforeflatTransformValues().Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatTransform())- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerSupplierthat generates a newly constructedValueTransformerThe supplier should always generate a new instance. Creating a singleValueTransformerobject and returning the same object reference inValueTransformeris a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with unmodified key and new values (possibly of different type) - See Also:
-
flatTransformValues
<VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. AValueTransformer(provided by the givenValueTransformerSupplier) is applied to each input record value and computes zero or more new values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.mapValues()). If you choose not to attach one, this operation is similar to the statelessmapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator#punctuate()the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { public ValueTransformer get() { return new MyValueTransformer(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerSupplier implements ValueTransformerSupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());With either strategy, within the
ValueTransformer, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformermust return anIterabletype (e.g., anyCollectiontype) intransform(). If the return value ofValueTransformer#transform()is an emptyIterableornull, no records are emitted. In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformertries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformer implements ValueTransformer { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<NewValueType> transform(V value) { // can access this.state List<NewValueType> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new NewValueType(value)); } return result; // values } void close() { // can access this.state } }repartition()should be performed beforeflatTransformValues().Setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatTransform())- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerSupplierthat generates a newly constructedValueTransformerThe supplier should always generate a new instance. Creating a singleValueTransformerobject and returning the same object reference inValueTransformeris a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with unmodified key and new values (possibly of different type) - See Also:
-
flatTransformValues
<VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes zero or more new values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.flatMapValues()). If you choose not to attach one, this operation is similar to the statelessflatMapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { public ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());With either strategy, within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformerWithKeymust return anIterabletype (e.g., anyCollectiontype) intransform(). If the return value ofValueTransformerWithKey#transform()is an emptyIterableornull, no records are emitted. In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformerWithKeytries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformerWithKey implements ValueTransformerWithKey { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<NewValueType> transform(K readOnlyKey, V value) { // can access this.state and use read-only key List<NewValueType> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new NewValueType(readOnlyKey)); } return result; // values } void close() { // can access this.state } }repartition()should be performed beforeflatTransformValues().Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatTransform())- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerWithKeySupplierthat generates a newly constructedValueTransformerWithKeyThe supplier should always generate a new instance. Creating a singleValueTransformerWithKeyobject and returning the same object reference inValueTransformerWithKeyis a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with unmodified key and new values (possibly of different type) - See Also:
-
flatTransformValues
<VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames) Transform the value of each input record into zero or more new values (with possibly a new type) and emit for each new value a record with the same key of the input record and the value. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes zero or more new values. Thus, an input record<K,V>can be transformed into output records<K:V'>, <K:V''>, .... Attaching a state store makes this a stateful record-by-record operation (cf.flatMapValues()). If you choose not to attach one, this operation is similar to the statelessflatMapValues()but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the transformer.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { public ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } }, "myValueTransformState");ValueTransformerSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the transformer.class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier { // supply transformer ValueTransformerWithKey get() { return new MyValueTransformerWithKey(); } // provide store(s) that will be added and connected to the associated transformer // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());With either strategy, within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered. TheValueTransformerWithKeymust return anIterabletype (e.g., anyCollectiontype) intransform(). If the return value ofValueTransformerWithKey#transform()is an emptyIterableornull, no records are emitted. In contrast totransform()andflatTransform(), no additionalKeyValuepairs can be emitted viaProcessorContext.forward(). AStreamsExceptionis thrown if theValueTransformerWithKeytries to emit aKeyValuepair.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyValueTransformerWithKey implements ValueTransformerWithKey { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myValueTransformState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } Iterable<NewValueType> transform(K readOnlyKey, V value) { // can access this.state and use read-only key List<NewValueType> result = new ArrayList<>(); for (int i = 0; i < 3; i++) { result.add(new NewValueType(readOnlyKey)); } return result; // values } void close() { // can access this.state } }repartition()should be performed beforeflatTransformValues().Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. So, setting a new value preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) is applied to the result
KStream. (cf.flatTransform())- Type Parameters:
VR- the value type of the result stream- Parameters:
valueTransformerSupplier- an instance ofValueTransformerWithKeySupplierthat generates a newly constructedValueTransformerWithKeyThe supplier should always generate a new instance. Creating a singleValueTransformerWithKeyobject and returning the same object reference inValueTransformerWithKeyis a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- Returns:
- a
KStreamthat contains more or less records with unmodified key and new values (possibly of different type) - See Also:
-
process
@Deprecated void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String... stateStoreNames) Deprecated.Since 3.0. Useprocess(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)instead.Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf.foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the statelessforeach(ForeachAction)but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the processor.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.processor(new ProcessorSupplier() { public Processor get() { return new MyProcessor(); } }, "myProcessorState");ProcessorSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the processor.class MyProcessorSupplier implements ProcessorSupplier { // supply processor Processor get() { return new MyProcessor(); } // provide store(s) that will be added and connected to the associated processor // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.process(new MyProcessorSupplier());With either strategy, within the
Processor, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyProcessor implements Processor { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myProcessorState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } void process(K key, V value) { // can access this.state } void close() { // can access this.state } }repartition()should be performed beforeprocess().- Parameters:
processorSupplier- an instance ofProcessorSupplierthat generates a newly constructedProcessorThe supplier should always generate a new instance. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()is a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- See Also:
-
process
void process(ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf.foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the statelessforeach(ForeachAction)but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the processor.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.processor(new ProcessorSupplier() { public Processor get() { return new MyProcessor(); } }, "myProcessorState");ProcessorSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the processor.class MyProcessorSupplier implements ProcessorSupplier { // supply processor Processor get() { return new MyProcessor(); } // provide store(s) that will be added and connected to the associated processor // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.process(new MyProcessorSupplier());With either strategy, within the
Processor, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyProcessor implements Processor { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myProcessorState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } void process(K key, V value) { // can access this.state } void close() { // can access this.state } }repartition()should be performed beforeprocess().- Parameters:
processorSupplier- an instance ofProcessorSupplierthat generates a newly constructedProcessorThe supplier should always generate a new instance. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()is a violation of the supplier pattern and leads to runtime exceptions.stateStoreNames- the names of the state stores used by the processor; not required if the supplier implementsConnectedStoreProvider.stores()- See Also:
-
process
@Deprecated void process(ProcessorSupplier<? super K, ? super V> processorSupplier, Named named, String... stateStoreNames) Deprecated.Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf.foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the statelessforeach(ForeachAction)but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the processor.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.processor(new ProcessorSupplier() { public Processor get() { return new MyProcessor(); } }, "myProcessorState");ProcessorSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the processor.class MyProcessorSupplier implements ProcessorSupplier { // supply processor Processor get() { return new MyProcessor(); } // provide store(s) that will be added and connected to the associated processor // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.process(new MyProcessorSupplier());With either strategy, within the
Processor, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyProcessor implements Processor { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myProcessorState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } void process(K key, V value) { // can access this.state } void close() { // can access this.state } }repartition()should be performed beforeprocess().- Parameters:
processorSupplier- an instance ofProcessorSupplierthat generates a newly constructedProcessorThe supplier should always generate a new instance. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()is a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state store used by the processor- See Also:
-
process
void process(ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, Named named, String... stateStoreNames) Process all records in this stream, one record at a time, by applying aProcessor(provided by the givenProcessorSupplier). Attaching a state store makes this a stateful record-by-record operation (cf.foreach(ForeachAction)). If you choose not to attach one, this operation is similar to the statelessforeach(ForeachAction)but allows access to theProcessorContextand record metadata. This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. Note that this is a terminal operation that returns void.In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the
StoreBuilders viaTopology.addStateStore(StoreBuilder, String...), and specify the store names viastateStoreNamesso they will be connected to the processor.
The second strategy is for the given// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); // add store builder.addStateStore(keyValueStoreBuilder); KStream outputStream = inputStream.processor(new ProcessorSupplier() { public Processor get() { return new MyProcessor(); } }, "myProcessorState");ProcessorSupplierto implementConnectedStoreProvider.stores(), which provides theStoreBuilders to be automatically added to the topology and connected to the processor.class MyProcessorSupplier implements ProcessorSupplier { // supply processor Processor get() { return new MyProcessor(); } // provide store(s) that will be added and connected to the associated processor // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext Set<StoreBuilder> stores() { StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), Serdes.String(), Serdes.String()); return Collections.singleton(keyValueStoreBuilder); } } ... KStream outputStream = inputStream.process(new MyProcessorSupplier());With either strategy, within the
Processor, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.
Even if any upstream operation was key-changing, no auto-repartition is triggered. If repartitioning is required, a call toclass MyProcessor implements Processor { private StateStore state; void init(ProcessorContext context) { this.state = context.getStateStore("myProcessorState"); // punctuate each second, can access this.state context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); } void process(K key, V value) { // can access this.state } void close() { // can access this.state } }repartition()should be performed beforeprocess().- Parameters:
processorSupplier- an instance ofProcessorSupplierthat generates a newly constructedProcessorThe supplier should always generate a new instance. Creating a singleProcessorobject and returning the same object reference inProcessorSupplier.get()is a violation of the supplier pattern and leads to runtime exceptions.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state store used by the processor- See Also:
-