K
- Type of keysV
- Type of valuespublic interface KStream<K,V>
KStream
is an abstraction of a record stream of KeyValue
pairs, i.e., each record is an
independent entity/event in the real world.
For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2>
in the stream.
A KStream
is either defined from one or multiple Kafka topics
that
are consumed message by message or the result of a KStream
transformation.
A KTable
can also be converted
into a KStream
.
A KStream
can be transformed record by record, joined with another KStream
, KTable
,
GlobalKTable
, or can be aggregated into a KTable
.
Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. Topology
) via
process(...)
,
transform(...)
, and
transformValues(...)
.
KTable
,
KGroupedStream
,
StreamsBuilder.stream(String)
Modifier and Type | Method and Description |
---|---|
KStream<K,V>[] |
branch(Named named,
Predicate<? super K,? super V>... predicates)
Creates an array of
KStream from this stream by branching the records in the original stream based on
the supplied predicates. |
KStream<K,V>[] |
branch(Predicate<? super K,? super V>... predicates)
Creates an array of
KStream from this stream by branching the records in the original stream based on
the supplied predicates. |
KStream<K,V> |
filter(Predicate<? super K,? super V> predicate)
Create a new
KStream that consists of all records of this stream which satisfy the given predicate. |
KStream<K,V> |
filter(Predicate<? super K,? super V> predicate,
Named named)
Create a new
KStream that consists of all records of this stream which satisfy the given predicate. |
KStream<K,V> |
filterNot(Predicate<? super K,? super V> predicate)
Create a new
KStream that consists all records of this stream which do not satisfy the given
predicate. |
KStream<K,V> |
filterNot(Predicate<? super K,? super V> predicate,
Named named)
Create a new
KStream that consists all records of this stream which do not satisfy the given
predicate. |
<KR,VR> KStream<KR,VR> |
flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<KR,VR> KStream<KR,VR> |
flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper,
Named named)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<VR> KStream<K,VR> |
flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper)
Create a new
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream. |
<VR> KStream<K,VR> |
flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper,
Named named)
Create a new
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream. |
<VR> KStream<K,VR> |
flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper)
Create a new
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream. |
<VR> KStream<K,VR> |
flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper,
Named named)
Create a new
KStream by transforming the value of each record in this stream into zero or more values
with the same key in the new stream. |
<K1,V1> KStream<K1,V1> |
flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier,
Named named,
String... stateStoreNames)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<K1,V1> KStream<K1,V1> |
flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier,
String... stateStoreNames)
Transform each record of the input stream into zero or more records in the output stream (both key and value type
can be altered arbitrarily).
|
<VR> KStream<K,VR> |
flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier,
Named named,
String... stateStoreNames)
Transform the value of each input record into zero or more new values (with possibly a new
type) and emit for each new value a record with the same key of the input record and the value.
|
<VR> KStream<K,VR> |
flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier,
String... stateStoreNames)
Transform the value of each input record into zero or more new values (with possibly a new
type) and emit for each new value a record with the same key of the input record and the value.
|
<VR> KStream<K,VR> |
flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier,
Named named,
String... stateStoreNames)
Transform the value of each input record into zero or more new values (with possibly a new
type) and emit for each new value a record with the same key of the input record and the value.
|
<VR> KStream<K,VR> |
flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier,
String... stateStoreNames)
Transform the value of each input record into zero or more new values (with possibly a new
type) and emit for each new value a record with the same key of the input record and the value.
|
void |
foreach(ForeachAction<? super K,? super V> action)
Perform an action on each record of
KStream . |
void |
foreach(ForeachAction<? super K,? super V> action,
Named named)
Perform an action on each record of
KStream . |
<KR> KGroupedStream<KR,V> |
groupBy(KeyValueMapper<? super K,? super V,KR> keySelector)
Group the records of this
KStream on a new key that is selected using the provided KeyValueMapper
and default serializers and deserializers. |
<KR> KGroupedStream<KR,V> |
groupBy(KeyValueMapper<? super K,? super V,KR> keySelector,
Grouped<KR,V> grouped)
Group the records of this
KStream on a new key that is selected using the provided KeyValueMapper
and Serde s as specified by Grouped . |
<KR> KGroupedStream<KR,V> |
groupBy(KeyValueMapper<? super K,? super V,KR> keySelector,
Serialized<KR,V> serialized)
Deprecated.
since 2.1. Use
groupBy(KeyValueMapper, Grouped) instead |
KGroupedStream<K,V> |
groupByKey()
Group the records by their current key into a
KGroupedStream while preserving the original values
and default serializers and deserializers. |
KGroupedStream<K,V> |
groupByKey(Grouped<K,V> grouped)
Group the records by their current key into a
KGroupedStream while preserving the original values
and using the serializers as defined by Grouped . |
KGroupedStream<K,V> |
groupByKey(Serialized<K,V> serialized)
Deprecated.
since 2.1. Use
groupByKey(Grouped) instead |
<GK,GV,RV> KStream<K,RV> |
join(GlobalKTable<GK,GV> globalTable,
KeyValueMapper<? super K,? super V,? extends GK> keySelector,
ValueJoiner<? super V,? super GV,? extends RV> joiner)
Join records of this stream with
GlobalKTable 's records using non-windowed inner equi join. |
<GK,GV,RV> KStream<K,RV> |
join(GlobalKTable<GK,GV> globalTable,
KeyValueMapper<? super K,? super V,? extends GK> keySelector,
ValueJoiner<? super V,? super GV,? extends RV> joiner,
Named named)
Join records of this stream with
GlobalKTable 's records using non-windowed inner equi join. |
<VO,VR> KStream<K,VR> |
join(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream 's records using windowed inner equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
join(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Joined<K,V,VO> joined)
Deprecated.
since 2.4. Use
join(KStream, ValueJoiner, JoinWindows, StreamJoined) instead. |
<VO,VR> KStream<K,VR> |
join(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
StreamJoined<K,V,VO> streamJoined)
Join records of this stream with another
KStream 's records using windowed inner equi join using the
StreamJoined instance for configuration of the key serde , this stream's value
serde , the other stream's value serde , and used state stores. |
<VT,VR> KStream<K,VR> |
join(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner)
Join records of this stream with
KTable 's records using non-windowed inner equi join with default
serializers and deserializers. |
<VT,VR> KStream<K,VR> |
join(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner,
Joined<K,V,VT> joined)
Join records of this stream with
KTable 's records using non-windowed inner equi join with default
serializers and deserializers. |
<GK,GV,RV> KStream<K,RV> |
leftJoin(GlobalKTable<GK,GV> globalTable,
KeyValueMapper<? super K,? super V,? extends GK> keySelector,
ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
Join records of this stream with
GlobalKTable 's records using non-windowed left equi join. |
<GK,GV,RV> KStream<K,RV> |
leftJoin(GlobalKTable<GK,GV> globalTable,
KeyValueMapper<? super K,? super V,? extends GK> keySelector,
ValueJoiner<? super V,? super GV,? extends RV> valueJoiner,
Named named)
Join records of this stream with
GlobalKTable 's records using non-windowed left equi join. |
<VO,VR> KStream<K,VR> |
leftJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream 's records using windowed left equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
leftJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Joined<K,V,VO> joined)
Deprecated.
since 2.4. Use
leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) instead. |
<VO,VR> KStream<K,VR> |
leftJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
StreamJoined<K,V,VO> streamJoined)
Join records of this stream with another
KStream 's records using windowed left equi join using the
StreamJoined instance for configuration of the key serde , this stream's value
serde , the other stream's value serde , and used state stores. |
<VT,VR> KStream<K,VR> |
leftJoin(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner)
Join records of this stream with
KTable 's records using non-windowed left equi join with default
serializers and deserializers. |
<VT,VR> KStream<K,VR> |
leftJoin(KTable<K,VT> table,
ValueJoiner<? super V,? super VT,? extends VR> joiner,
Joined<K,V,VT> joined)
Join records of this stream with
KTable 's records using non-windowed left equi join with default
serializers and deserializers. |
<KR,VR> KStream<KR,VR> |
map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
Transform each record of the input stream into a new record in the output stream (both key and value type can be
altered arbitrarily).
|
<KR,VR> KStream<KR,VR> |
map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper,
Named named)
Transform each record of the input stream into a new record in the output stream (both key and value type can be
altered arbitrarily).
|
<VR> KStream<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
<VR> KStream<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper,
Named named)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
<VR> KStream<K,VR> |
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
<VR> KStream<K,VR> |
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper,
Named named)
Transform the value of each input record into a new value (with possible new type) of the output record.
|
KStream<K,V> |
merge(KStream<K,V> stream)
Merge this stream and the given stream into one larger stream.
|
KStream<K,V> |
merge(KStream<K,V> stream,
Named named)
Merge this stream and the given stream into one larger stream.
|
<VO,VR> KStream<K,VR> |
outerJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows)
Join records of this stream with another
KStream 's records using windowed outer equi join with default
serializers and deserializers. |
<VO,VR> KStream<K,VR> |
outerJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
Joined<K,V,VO> joined)
Deprecated.
since 2.4. Use
outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) instead. |
<VO,VR> KStream<K,VR> |
outerJoin(KStream<K,VO> otherStream,
ValueJoiner<? super V,? super VO,? extends VR> joiner,
JoinWindows windows,
StreamJoined<K,V,VO> streamJoined)
Join records of this stream with another
KStream 's records using windowed outer equi join using the
StreamJoined instance for configuration of the key serde , this stream's value
serde , the other stream's value serde , and used state stores. |
KStream<K,V> |
peek(ForeachAction<? super K,? super V> action)
Perform an action on each record of
KStream . |
KStream<K,V> |
peek(ForeachAction<? super K,? super V> action,
Named named)
Perform an action on each record of
KStream . |
void |
print(Printed<K,V> printed)
Print the records of this KStream using the options provided by
Printed
Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print. |
void |
process(ProcessorSupplier<? super K,? super V> processorSupplier,
Named named,
String... stateStoreNames)
Process all records in this stream, one record at a time, by applying a
Processor (provided by the given
ProcessorSupplier ). |
void |
process(ProcessorSupplier<? super K,? super V> processorSupplier,
String... stateStoreNames)
Process all records in this stream, one record at a time, by applying a
Processor (provided by the given
ProcessorSupplier ). |
KStream<K,V> |
repartition()
Materialize this stream to an auto-generated repartition topic and create a new
KStream
from the auto-generated topic using default serializers, deserializers, and producer's DefaultPartitioner . |
KStream<K,V> |
repartition(Repartitioned<K,V> repartitioned)
Materialize this stream to an auto-generated repartition topic and create a new
KStream
from the auto-generated topic using key serde , value serde , StreamPartitioner ,
number of partitions, and topic name part as defined by Repartitioned . |
<KR> KStream<KR,V> |
selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Set a new key (with possibly new type) for each input record.
|
<KR> KStream<KR,V> |
selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper,
Named named)
Set a new key (with possibly new type) for each input record.
|
KStream<K,V> |
through(String topic)
Deprecated.
since 2.6; used
repartition() instead |
KStream<K,V> |
through(String topic,
Produced<K,V> produced)
Deprecated.
since 2.6; use
repartition(Repartitioned) instead |
void |
to(String topic)
Materialize this stream to a topic using default serializers specified in the config and producer's
DefaultPartitioner . |
void |
to(String topic,
Produced<K,V> produced)
Materialize this stream to a topic using the provided
Produced instance. |
void |
to(TopicNameExtractor<K,V> topicExtractor)
Dynamically materialize this stream to topics using default serializers specified in the config and producer's
DefaultPartitioner . |
void |
to(TopicNameExtractor<K,V> topicExtractor,
Produced<K,V> produced)
Dynamically materialize this stream to topics using the provided
Produced instance. |
KTable<K,V> |
toTable()
Convert this stream to a
KTable . |
KTable<K,V> |
toTable(Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Convert this stream to a
KTable . |
KTable<K,V> |
toTable(Named named)
Convert this stream to a
KTable . |
KTable<K,V> |
toTable(Named named,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Convert this stream to a
KTable . |
<K1,V1> KStream<K1,V1> |
transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier,
Named named,
String... stateStoreNames)
Transform each record of the input stream into zero or one record in the output stream (both key and value type
can be altered arbitrarily).
|
<K1,V1> KStream<K1,V1> |
transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier,
String... stateStoreNames)
Transform each record of the input stream into zero or one record in the output stream (both key and value type
can be altered arbitrarily).
|
<VR> KStream<K,VR> |
transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier,
Named named,
String... stateStoreNames)
Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
<VR> KStream<K,VR> |
transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier,
String... stateStoreNames)
Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
<VR> KStream<K,VR> |
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier,
Named named,
String... stateStoreNames)
Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
<VR> KStream<K,VR> |
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier,
String... stateStoreNames)
Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
KStream<K,V> filter(Predicate<? super K,? super V> predicate)
KStream
that consists of all records of this stream which satisfy the given predicate.
All records that do not satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate
- a filter Predicate
that is applied to each recordKStream
that contains only those records that satisfy the given predicatefilterNot(Predicate)
KStream<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
KStream
that consists of all records of this stream which satisfy the given predicate.
All records that do not satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate
- a filter Predicate
that is applied to each recordnamed
- a Named
config used to name the processor in the topologyKStream
that contains only those records that satisfy the given predicatefilterNot(Predicate)
KStream<K,V> filterNot(Predicate<? super K,? super V> predicate)
KStream
that consists all records of this stream which do not satisfy the given
predicate.
All records that do satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate
- a filter Predicate
that is applied to each recordKStream
that contains only those records that do not satisfy the given predicatefilter(Predicate)
KStream<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
KStream
that consists all records of this stream which do not satisfy the given
predicate.
All records that do satisfy the predicate are dropped.
This is a stateless record-by-record operation.predicate
- a filter Predicate
that is applied to each recordnamed
- a Named
config used to name the processor in the topologyKStream
that contains only those records that do not satisfy the given predicatefilter(Predicate)
<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
KeyValueMapper
is applied to each input record and computes a new key for it.
Thus, an input record <K,V>
can be transformed into an output record <K':V>
.
This is a stateless record-by-record operation.
For example, you can use this transformation to set a key for a key-less input record <null,V>
by
extracting a key from the value within your KeyValueMapper
. The example below computes the new key as the
length of the value string.
KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
Integer apply(Byte[] key, String value) {
return value.length();
}
});
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
.KR
- the new key type of the result streammapper
- a KeyValueMapper
that computes a new key for each recordKStream
that contains records with new key (possibly of different type) and unmodified valuemap(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
KeyValueMapper
is applied to each input record and computes a new key for it.
Thus, an input record <K,V>
can be transformed into an output record <K':V>
.
This is a stateless record-by-record operation.
For example, you can use this transformation to set a key for a key-less input record <null,V>
by
extracting a key from the value within your KeyValueMapper
. The example below computes the new key as the
length of the value string.
KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
Integer apply(Byte[] key, String value) {
return value.length();
}
});
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
.KR
- the new key type of the result streammapper
- a KeyValueMapper
that computes a new key for each recordnamed
- a Named
config used to name the processor in the topologyKStream
that contains records with new key (possibly of different type) and unmodified valuemap(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
<KR,VR> KStream<KR,VR> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
KeyValueMapper
is applied to each input record and computes a new output record.
Thus, an input record <K,V>
can be transformed into an output record <K':V'>
.
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...)
for
stateful record transformation).
The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
KeyValue<String, Integer> apply(String key, String value) {
return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
}
});
The provided KeyValueMapper
must return a KeyValue
type and must not return null
.
Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
. (cf. mapValues(ValueMapper)
)
KR
- the key type of the result streamVR
- the value type of the result streammapper
- a KeyValueMapper
that computes a new output recordKStream
that contains records with new key and value (possibly both of different type)selectKey(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<KR,VR> KStream<KR,VR> map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper, Named named)
KeyValueMapper
is applied to each input record and computes a new output record.
Thus, an input record <K,V>
can be transformed into an output record <K':V'>
.
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...)
for
stateful record transformation).
The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
KeyValue<String, Integer> apply(String key, String value) {
return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
}
});
The provided KeyValueMapper
must return a KeyValue
type and must not return null
.
Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
. (cf. mapValues(ValueMapper)
)
KR
- the key type of the result streamVR
- the value type of the result streammapper
- a KeyValueMapper
that computes a new output recordnamed
- a Named
config used to name the processor in the topologyKStream
that contains records with new key and value (possibly both of different type)selectKey(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
ValueMapper
is applied to each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation (cf.
transformValues(ValueTransformerSupplier, String...)
for stateful value transformation).
The example below counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. map(KeyValueMapper)
)VR
- the value type of the result streammapper
- a ValueMapper
that computes a new output valueKStream
that contains records with unmodified key and new values (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
ValueMapper
is applied to each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation (cf.
transformValues(ValueTransformerSupplier, String...)
for stateful value transformation).
The example below counts the number of token of the value string.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. map(KeyValueMapper)
)VR
- the value type of the result streammapper
- a ValueMapper
that computes a new output valuenamed
- a Named
config used to name the processor in the topologyKStream
that contains records with unmodified key and new values (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
ValueMapperWithKey
is applied to each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation (cf.
transformValues(ValueTransformerWithKeySupplier, String...)
for stateful value transformation).
The example below counts the number of tokens of key and value strings.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
Integer apply(String readOnlyKey, String value) {
return readOnlyKey.split(" ").length + value.split(" ").length;
}
});
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. map(KeyValueMapper)
)VR
- the value type of the result streammapper
- a ValueMapperWithKey
that computes a new output valueKStream
that contains records with unmodified key and new values (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
ValueMapperWithKey
is applied to each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation (cf.
transformValues(ValueTransformerWithKeySupplier, String...)
for stateful value transformation).
The example below counts the number of tokens of key and value strings.
KStream<String, String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
Integer apply(String readOnlyKey, String value) {
return readOnlyKey.split(" ").length + value.split(" ").length;
}
});
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. map(KeyValueMapper)
)VR
- the value type of the result streammapper
- a ValueMapperWithKey
that computes a new output valuenamed
- a Named
config used to name the processor in the topologyKStream
that contains records with unmodified key and new values (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
<KR,VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper)
KeyValueMapper
is applied to each input record and computes zero or more output records.
Thus, an input record <K,V>
can be transformed into output records <K':V'>, <K'':V''>, ...
.
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...)
for
stateful record transformation).
The example below splits input records <null:String>
containing sentences as values into their words
and emit a record <word:1>
for each word.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.flatMap(
new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
String[] tokens = value.split(" ");
List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
for(String token : tokens) {
result.add(new KeyValue<>(token, 1));
}
return result;
}
});
The provided KeyValueMapper
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
. (cf. flatMapValues(ValueMapper)
)
KR
- the key type of the result streamVR
- the value type of the result streammapper
- a KeyValueMapper
that computes the new output recordsKStream
that contains more or less records with new key and value (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
<KR,VR> KStream<KR,VR> flatMap(KeyValueMapper<? super K,? super V,? extends Iterable<? extends KeyValue<? extends KR,? extends VR>>> mapper, Named named)
KeyValueMapper
is applied to each input record and computes zero or more output records.
Thus, an input record <K,V>
can be transformed into output records <K':V'>, <K'':V''>, ...
.
This is a stateless record-by-record operation (cf. transform(TransformerSupplier, String...)
for
stateful record transformation).
The example below splits input records <null:String>
containing sentences as values into their words
and emit a record <word:1>
for each word.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<String, Integer> outputStream = inputStream.flatMap(
new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
String[] tokens = value.split(" ");
List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
for(String token : tokens) {
result.add(new KeyValue<>(token, 1));
}
return result;
}
});
The provided KeyValueMapper
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
. (cf. flatMapValues(ValueMapper)
)
KR
- the key type of the result streamVR
- the value type of the result streammapper
- a KeyValueMapper
that computes the new output recordsnamed
- a Named
config used to name the processor in the topologyKStream
that contains more or less records with new key and value (possibly of different type)selectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
flatMapValues(ValueMapper)
,
flatMapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper)
KStream
by transforming the value of each record in this stream into zero or more values
with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output
stream (value type can be altered arbitrarily).
The provided ValueMapper
is applied to each input record and computes zero or more output values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...)
for stateful value transformation).
The example below splits input records <null:String>
containing sentences as values into their words.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
});
The provided ValueMapper
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatMap(KeyValueMapper)
)
VR
- the value type of the result streammapper
- a ValueMapper
the computes the new output valuesKStream
that contains more or less records with unmodified keys and new values of different typeselectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> flatMapValues(ValueMapper<? super V,? extends Iterable<? extends VR>> mapper, Named named)
KStream
by transforming the value of each record in this stream into zero or more values
with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output
stream (value type can be altered arbitrarily).
The provided ValueMapper
is applied to each input record and computes zero or more output values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
This is a stateless record-by-record operation (cf. transformValues(ValueTransformerSupplier, String...)
for stateful value transformation).
The example below splits input records <null:String>
containing sentences as values into their words.
KStream<byte[], String> inputStream = builder.stream("topic");
KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
});
The provided ValueMapper
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatMap(KeyValueMapper)
)
VR
- the value type of the result streammapper
- a ValueMapper
the computes the new output valuesnamed
- a Named
config used to name the processor in the topologyKStream
that contains more or less records with unmodified keys and new values of different typeselectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper)
KStream
by transforming the value of each record in this stream into zero or more values
with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output
stream (value type can be altered arbitrarily).
The provided ValueMapperWithKey
is applied to each input record and computes zero or more output values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...)
for stateful value transformation).
The example below splits input records <Integer:String>
, with key=1, containing sentences as values
into their words.
KStream<Integer, String> inputStream = builder.stream("topic");
KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
if(readOnlyKey == 1) {
return Arrays.asList(value.split(" "));
} else {
return Arrays.asList(value);
}
}
});
The provided ValueMapperWithKey
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatMap(KeyValueMapper)
)
VR
- the value type of the result streammapper
- a ValueMapperWithKey
the computes the new output valuesKStream
that contains more or less records with unmodified keys and new values of different typeselectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
<VR> KStream<K,VR> flatMapValues(ValueMapperWithKey<? super K,? super V,? extends Iterable<? extends VR>> mapper, Named named)
KStream
by transforming the value of each record in this stream into zero or more values
with the same key in the new stream.
Transform the value of each input record into zero or more records with the same (unmodified) key in the output
stream (value type can be altered arbitrarily).
The provided ValueMapperWithKey
is applied to each input record and computes zero or more output values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
This is a stateless record-by-record operation (cf. transformValues(ValueTransformerWithKeySupplier, String...)
for stateful value transformation).
The example below splits input records <Integer:String>
, with key=1, containing sentences as values
into their words.
KStream<Integer, String> inputStream = builder.stream("topic");
KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
if(readOnlyKey == 1) {
return Arrays.asList(value.split(" "));
} else {
return Arrays.asList(value);
}
}
});
The provided ValueMapperWithKey
must return an Iterable
(e.g., any Collection
type)
and the return value must not be null
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatMap(KeyValueMapper)
)
VR
- the value type of the result streammapper
- a ValueMapperWithKey
the computes the new output valuesnamed
- a Named
config used to name the processor in the topologyKStream
that contains more or less records with unmodified keys and new values of different typeselectKey(KeyValueMapper)
,
map(KeyValueMapper)
,
flatMap(KeyValueMapper)
,
mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
flatTransformValues(ValueTransformerSupplier, String...)
,
flatTransformValues(ValueTransformerWithKeySupplier, String...)
void print(Printed<K,V> printed)
Printed
Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
It SHOULD NOT be used for production usage if performance requirements are concerned.printed
- options for printingvoid foreach(ForeachAction<? super K,? super V> action)
KStream
.
This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)
).
Note that this is a terminal operation that returns void.action
- an action to perform on each recordprocess(ProcessorSupplier, String...)
void foreach(ForeachAction<? super K,? super V> action, Named named)
KStream
.
This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)
).
Note that this is a terminal operation that returns void.action
- an action to perform on each recordnamed
- a Named
config used to name the processor in the topologyprocess(ProcessorSupplier, String...)
KStream<K,V> peek(ForeachAction<? super K,? super V> action)
KStream
.
This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)
).
Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
action
- an action to perform on each recordprocess(ProcessorSupplier, String...)
KStream<K,V> peek(ForeachAction<? super K,? super V> action, Named named)
KStream
.
This is a stateless record-by-record operation (cf. process(ProcessorSupplier, String...)
).
Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) and returns an unchanged stream.
Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
action
- an action to perform on each recordnamed
- a Named
config used to name the processor in the topologyprocess(ProcessorSupplier, String...)
KStream<K,V>[] branch(Predicate<? super K,? super V>... predicates)
KStream
from this stream by branching the records in the original stream based on
the supplied predicates.
Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
The branching happens on first-match: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true, and is assigned to this stream only.
A record will be dropped if none of the predicates evaluate to true.
This is a stateless record-by-record operation.predicates
- the ordered list of Predicate
instancesKStream
KStream<K,V>[] branch(Named named, Predicate<? super K,? super V>... predicates)
KStream
from this stream by branching the records in the original stream based on
the supplied predicates.
Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
The branching happens on first-match: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true, and is assigned to this stream only.
A record will be dropped if none of the predicates evaluate to true.
This is a stateless record-by-record operation.KStream<K,V> merge(KStream<K,V> stream)
There is no ordering guarantee between records from this KStream
and records from
the provided KStream
in the merged stream.
Relative order is preserved within each input stream though (ie, records within one input
stream are processed in order).
stream
- a stream which is to be merged into this streamKStream
KStream<K,V> merge(KStream<K,V> stream, Named named)
There is no ordering guarantee between records from this KStream
and records from
the provided KStream
in the merged stream.
Relative order is preserved within each input stream though (ie, records within one input
stream are processed in order).
stream
- a stream which is to be merged into this streamnamed
- a Named
config used to name the processor in the topologyKStream
@Deprecated KStream<K,V> through(String topic)
repartition()
insteadKStream
from the topic using default serializers,
deserializers, and producer's DefaultPartitioner
.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is similar to calling #to(someTopicName)
and
StreamsBuilder#stream(someTopicName)
.
Note that through()
uses a hard coded timestamp extractor
and does not allow to customize it, to ensure correct timestamp propagation.
topic
- the topic nameKStream
that contains the exact same (and potentially repartitioned) records as this KStream
@Deprecated KStream<K,V> through(String topic, Produced<K,V> produced)
repartition(Repartitioned)
insteadKStream
from the topic using the
Produced
instance for configuration of the key serde
, value serde
,
and StreamPartitioner
.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is similar to calling to(someTopic, Produced.with(keySerde, valueSerde)
and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))
.
Note that through()
uses a hard coded timestamp extractor
and does not allow to customize it, to ensure correct timestamp propagation.
topic
- the topic nameproduced
- the options to use when producing to the topicKStream
that contains the exact same (and potentially repartitioned) records as this KStream
KStream<K,V> repartition()
KStream
from the auto-generated topic using default serializers, deserializers, and producer's DefaultPartitioner
.
The number of partitions is determined based on the upstream topics partition numbers.
The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
KStream
that contains the exact same repartitioned records as this KStream
.KStream<K,V> repartition(Repartitioned<K,V> repartitioned)
KStream
from the auto-generated topic using key serde
, value serde
, StreamPartitioner
,
number of partitions, and topic name part as defined by Repartitioned
.
The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
The topic will be named as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is either provided via Repartitioned.as(String)
or an internally
generated name, and "-repartition" is a fixed suffix.
repartitioned
- the Repartitioned
instance used to specify Serdes
,
StreamPartitioner
which determines how records are distributed among partitions of the topic,
part of the topic name, and number of partitions for a repartition topic.KStream
that contains the exact same repartitioned records as this KStream
.void to(String topic)
DefaultPartitioner
.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).topic
- the topic namevoid to(String topic, Produced<K,V> produced)
Produced
instance.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).topic
- the topic nameproduced
- the options to use when producing to the topicvoid to(TopicNameExtractor<K,V> topicExtractor)
DefaultPartitioner
.
The topic names for each record to send to is dynamically determined based on the TopicNameExtractor
.topicExtractor
- the extractor to determine the name of the Kafka topic to write to for each recordvoid to(TopicNameExtractor<K,V> topicExtractor, Produced<K,V> produced)
Produced
instance.
The topic names for each record to send to is dynamically determined based on the TopicNameExtractor
.topicExtractor
- the extractor to determine the name of the Kafka topic to write to for each recordproduced
- the options to use when producing to the topicKTable<K,V> toTable()
KTable
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KTable
is partitioned
correctly on its key.
Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG
config for this case, because
repartition topics are considered transient and don't allow to recover the result KTable
in cause of
a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
it was a "fact/event" and is re-interpreted as update now (cf. KStream
vs KTable
).
KTable
that contains the same records as this KStream
KTable<K,V> toTable(Named named)
KTable
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KTable
is partitioned
correctly on its key.
Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG
config for this case, because
repartition topics are considered transient and don't allow to recover the result KTable
in cause of
a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
it was a "fact/event" and is re-interpreted as update now (cf. KStream
vs KTable
).
KTable<K,V> toTable(Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KTable
is partitioned
correctly on its key.
Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG
config for this case, because
repartition topics are considered transient and don't allow to recover the result KTable
in cause of
a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
it was a "fact/event" and is re-interpreted as update now (cf. KStream
vs KTable
).
materialized
- an instance of Materialized
used to describe how the state store of the
resulting table should be materialized.KTable
that contains the same records as this KStream
KTable<K,V> toTable(Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KTable
is partitioned
correctly on its key.
Note that you cannot enable StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG
config for this case, because
repartition topics are considered transient and don't allow to recover the result KTable
in cause of
a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
it was a "fact/event" and is re-interpreted as update now (cf. KStream
vs KTable
).
named
- a Named
config used to name the processor in the topologymaterialized
- an instance of Materialized
used to describe how the state store of the
resulting table should be materialized.KTable
that contains the same records as this KStream
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector)
KStream
on a new key that is selected using the provided KeyValueMapper
and default serializers and deserializers.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
The KeyValueMapper
selects a new key (which may or may not be of the same type) while preserving the
original values.
If the new record key is null
the record will not be included in the resulting KGroupedStream
Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
later operator depends on the newly selected key.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
and rereading all records from it, such that the resulting KGroupedStream
is partitioned on the new key.
This operation is equivalent to calling selectKey(KeyValueMapper)
followed by groupByKey()
.
If the key type is changed, it is recommended to use groupBy(KeyValueMapper, Grouped)
instead.
KR
- the key type of the result KGroupedStream
keySelector
- a KeyValueMapper
that computes a new key for groupingKGroupedStream
that contains the grouped records of the original KStream
@Deprecated <KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Serialized<KR,V> serialized)
groupBy(KeyValueMapper, Grouped)
insteadKStream
on a new key that is selected using the provided KeyValueMapper
and Serde
s as specified by Serialized
.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
The KeyValueMapper
selects a new key (which may or may not be of the same type) while preserving the
original values.
If the new record key is null
the record will not be included in the resulting KGroupedStream
.
Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
later operator depends on the newly selected key.
This topic will be as "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
and rereading all records from it, such that the resulting KGroupedStream
is partitioned on the new key.
This operation is equivalent to calling selectKey(KeyValueMapper)
followed by groupByKey()
.
KR
- the key type of the result KGroupedStream
keySelector
- a KeyValueMapper
that computes a new key for groupingKGroupedStream
that contains the grouped records of the original KStream
<KR> KGroupedStream<KR,V> groupBy(KeyValueMapper<? super K,? super V,KR> keySelector, Grouped<KR,V> grouped)
KStream
on a new key that is selected using the provided KeyValueMapper
and Serde
s as specified by Grouped
.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
The KeyValueMapper
selects a new key (which may or may not be of the same type) while preserving the
original values.
If the new record key is null
the record will not be included in the resulting KGroupedStream
.
Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
operator depends on the newly selected key.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is either provided via Grouped.as(String)
or an
internally generated name.
You can retrieve all generated internal topic names via Topology.describe()
.
All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
and rereading all records from it, such that the resulting KGroupedStream
is partitioned on the new key.
This operation is equivalent to calling selectKey(KeyValueMapper)
followed by groupByKey()
.
KR
- the key type of the result KGroupedStream
keySelector
- a KeyValueMapper
that computes a new key for groupinggrouped
- the Grouped
instance used to specify Serdes
and part of the name for a repartition topic if repartitioning is required.KGroupedStream
that contains the grouped records of the original KStream
KGroupedStream<K,V> groupByKey()
KGroupedStream
while preserving the original values
and default serializers and deserializers.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
If a record key is null
the record will not be included in the resulting KGroupedStream
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic may need to be created in
Kafka if a later operator depends on the newly selected key.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KGroupedStream
is partitioned
correctly on its key.
If the last key changing operator changed the key type, it is recommended to use
groupByKey(org.apache.kafka.streams.kstream.Grouped)
instead.
KGroupedStream
that contains the grouped records of the original KStream
groupBy(KeyValueMapper)
@Deprecated KGroupedStream<K,V> groupByKey(Serialized<K,V> serialized)
groupByKey(Grouped)
insteadKGroupedStream
while preserving the original values
and using the serializers as defined by Serialized
.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
If a record key is null
the record will not be included in the resulting KGroupedStream
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic may need to be created in
Kafka if a later operator depends on the newly selected key.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"<name>" is an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KGroupedStream
is partitioned
correctly on its key.
KGroupedStream
that contains the grouped records of the original KStream
groupBy(KeyValueMapper)
KGroupedStream<K,V> groupByKey(Grouped<K,V> grouped)
KGroupedStream
while preserving the original values
and using the serializers as defined by Grouped
.
KGroupedStream
can be further grouped with other streams to form a CogroupedKStream
.
Grouping a stream on the record key is required before an aggregation operator can be applied to the data
(cf. KGroupedStream
).
If a record key is null
the record will not be included in the resulting KGroupedStream
.
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper)
,
map(KeyValueMapper)
, flatMap(KeyValueMapper)
or
transform(TransformerSupplier, String...)
) an internal repartitioning topic may need to be created in
Kafka if a later operator depends on the newly selected key.
This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
<name> is either provided via Grouped.as(String)
or an internally
generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the resulting KGroupedStream
is partitioned
correctly on its key.
grouped
- the Grouped
instance used to specify Serdes
and part of the name for a repartition topic if repartitioning is required.KGroupedStream
that contains the grouped records of the original KStream
groupBy(KeyValueMapper)
<VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream
's records using windowed inner equi join with default
serializers and deserializers.
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | ||
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is an
internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key and within the joining window intervalsleftJoin(KStream, ValueJoiner, JoinWindows)
,
outerJoin(KStream, ValueJoiner, JoinWindows)
@Deprecated <VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)
join(KStream, ValueJoiner, JoinWindows, StreamJoined)
instead.KStream
's records using windowed inner equi join using the
Joined
instance for configuration of the key serde
, this stream's value serde
,
and the other stream's value serde
.
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | ||
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is an
internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
joined
- a Joined
instance that defines the serdes to
be used to serialize/deserialize inputs and outputs of the joined streamsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key and within the joining window intervalsleftJoin(KStream, ValueJoiner, JoinWindows, Joined)
,
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
<VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
KStream
's records using windowed inner equi join using the
StreamJoined
instance for configuration of the key serde
, this stream's value
serde
, the other stream's value serde
, and used state stores.
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | ||
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names,
unless a name is provided via a Materialized
instance.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is an
internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
streamJoined
- a StreamJoined
used to configure join storesKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key and within the joining window intervalsleftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
<VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream
's records using windowed left equi join with default
serializers and deserializers.
In contrast to inner-join
, all records from this stream will
produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of this KStream
that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for the other stream.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
this KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows)
,
outerJoin(KStream, ValueJoiner, JoinWindows)
@Deprecated <VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)
leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
instead.KStream
's records using windowed left equi join using the
Joined
instance for configuration of the key serde
, this stream's value serde
,
and the other stream's value serde
.
In contrast to inner-join
, all records from this stream will
produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of this KStream
that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for the other stream.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
joined
- a Joined
instance that defines the serdes to
be used to serialize/deserialize inputs and outputs of the joined streamsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
this KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, Joined)
,
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
<VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
KStream
's records using windowed left equi join using the
StreamJoined
instance for configuration of the key serde
, this stream's value
serde
, the other stream's value serde
, and used state stores.
In contrast to inner-join
, all records from this stream will
produce at least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of this KStream
that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for the other stream.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(B,b)> |
<K3:c> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names,
unless a name is provided via a Materialized
instance.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
streamJoined
- a StreamJoined
instance to configure serdes and state storesKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
this KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
<VO,VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)
KStream
's records using windowed outer equi join with default
serializers and deserializers.
In contrast to inner-join
or
left-join
, all records from both streams will produce at
least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of both KStream
s that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for the this/other stream, respectively.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(null,b)> <K2:ValueJoiner(B,b)> |
<K3:c> | <K3:ValueJoiner(null,c)> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
KStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows)
,
leftJoin(KStream, ValueJoiner, JoinWindows)
@Deprecated <VO,VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)
outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
instead.KStream
's records using windowed outer equi join using the
Joined
instance for configuration of the key serde
, this stream's value serde
,
and the other stream's value serde
.
In contrast to inner-join
or
left-join
, all records from both streams will produce at
least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of both KStream
s that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for this/other stream, respectively.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(null,b)> <K2:ValueJoiner(B,b)> |
<K3:c> | <K3:ValueJoiner(null,c)> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
joined
- a Joined
instance that defines the serdes to
be used to serialize/deserialize inputs and outputs of the joined streamsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, Joined)
,
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
<VO,VR> KStream<K,VR> outerJoin(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, StreamJoined<K,V,VO> streamJoined)
KStream
's records using windowed outer equi join using the
StreamJoined
instance for configuration of the key serde
, this stream's value
serde
, the other stream's value serde
, and used state stores.
In contrast to inner-join
or
left-join
, all records from both streams will produce at
least one output record (cf. below).
The join is computed on the records' key with join attribute thisKStream.key == otherKStream.key
.
Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
JoinWindows
, i.e., the window defines an additional join predicate on the record timestamps.
For each pair of records meeting both join predicates the provided ValueJoiner
will be called to compute
a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Furthermore, for each input record of both KStream
s that does not satisfy the join predicate the provided
ValueJoiner
will be called with a null
value for this/other stream, respectively.
If an input record key or value is null
the record will not be included in the join operation and thus no
output record will be added to the resulting KStream
.
Example (assuming all input records belong to the correct windows):
this | other | result |
---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | |
<K2:B> | <K2:b> | <K2:ValueJoiner(null,b)> <K2:ValueJoiner(B,b)> |
<K3:c> | <K3:ValueJoiner(null,c)> |
repartition(Repartitioned)
(for one input stream) before
doing the join and specify the "correct" number of partitions via Repartitioned
parameter.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
Repartitioning can happen for one or both of the joining KStream
s.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
Both of the joining KStream
s will be materialized in local state stores with auto-generated store names,
unless a name is provided via a Materialized
instance.
For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is user-specified
in StreamsConfig
via parameter APPLICATION_ID_CONFIG
,
"storeName" is an internally generated name, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VO
- the value type of the other streamVR
- the value type of the result streamotherStream
- the KStream
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordswindows
- the specification of the JoinWindows
streamJoined
- a StreamJoined
instance to configure serdes and state storesKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KStream
and within the joining window intervalsjoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
<VT,VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
KTable
's records using non-windowed inner equi join with default
serializers and deserializers.
The join is a primary key table lookup join with join attribute stream.key == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable
state.
In contrast, processing KTable
input records will only update the internal KTable
state and
will not produce any result records.
For each KStream
record that finds a corresponding record in KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
Example:
KStream | KTable | state | result |
---|---|---|---|
<K1:A> | |||
<K1:b> | <K1:b> | ||
<K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
repartition(Repartitioned)
for this KStream
before doing the join, specifying the same number of partitions via Repartitioned
parameter as the given
KTable
.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner)
.
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
Repartitioning can happen only for this KStream
but not for the provided KTable
.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
VT
- the value type of the tableVR
- the value type of the result streamtable
- the KTable
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same keyleftJoin(KTable, ValueJoiner)
,
join(GlobalKTable, KeyValueMapper, ValueJoiner)
<VT,VR> KStream<K,VR> join(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
KTable
's records using non-windowed inner equi join with default
serializers and deserializers.
The join is a primary key table lookup join with join attribute stream.key == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable
state.
In contrast, processing KTable
input records will only update the internal KTable
state and
will not produce any result records.
For each KStream
record that finds a corresponding record in KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
If an KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
Example:
KStream | KTable | state | result |
---|---|---|---|
<K1:A> | |||
<K1:b> | <K1:b> | ||
<K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
repartition(Repartitioned)
for this KStream
before doing the join, specifying the same number of partitions via Repartitioned
parameter as the given
KTable
.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner)
.
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
Repartitioning can happen only for this KStream
but not for the provided KTable
.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
VT
- the value type of the tableVR
- the value type of the result streamtable
- the KTable
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsjoined
- a Joined
instance that defines the serdes to
be used to serialize/deserialize inputs of the joined streamsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same keyleftJoin(KTable, ValueJoiner, Joined)
,
join(GlobalKTable, KeyValueMapper, ValueJoiner)
<VT,VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner)
KTable
's records using non-windowed left equi join with default
serializers and deserializers.
In contrast to inner-join
, all records from this stream will produce an
output record (cf. below).
The join is a primary key table lookup join with join attribute stream.key == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable
state.
In contrast, processing KTable
input records will only update the internal KTable
state and
will not produce any result records.
For each KStream
record whether or not it finds a corresponding record in KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
If no KTable
record was found during lookup, a null
value will be provided to ValueJoiner
.
The key of the result record is the same as for both joining input records.
If an KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
Example:
KStream | KTable | state | result |
---|---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:b> | <K1:b> | ||
<K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
repartition(Repartitioned)
for this KStream
before doing the join, specifying the same number of partitions via Repartitioned
parameter as the given
KTable
.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner)
.
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
Repartitioning can happen only for this KStream
but not for the provided KTable
.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
VT
- the value type of the tableVR
- the value type of the result streamtable
- the KTable
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordjoin(KTable, ValueJoiner)
,
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<VT,VR> KStream<K,VR> leftJoin(KTable<K,VT> table, ValueJoiner<? super V,? super VT,? extends VR> joiner, Joined<K,V,VT> joined)
KTable
's records using non-windowed left equi join with default
serializers and deserializers.
In contrast to inner-join
, all records from this stream will produce an
output record (cf. below).
The join is a primary key table lookup join with join attribute stream.key == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current (i.e., processing time) internal
KTable
state.
In contrast, processing KTable
input records will only update the internal KTable
state and
will not produce any result records.
For each KStream
record whether or not it finds a corresponding record in KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
If no KTable
record was found during lookup, a null
value will be provided to ValueJoiner
.
The key of the result record is the same as for both joining input records.
If an KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
Example:
KStream | KTable | state | result |
---|---|---|---|
<K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:b> | <K1:b> | ||
<K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> |
repartition(Repartitioned)
for this KStream
before doing the join, specifying the same number of partitions via Repartitioned
parameter as the given
KTable
.
Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
cf. join(GlobalKTable, KeyValueMapper, ValueJoiner)
.
If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "<name>" is an internally generated
name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
Repartitioning can happen only for this KStream
but not for the provided KTable
.
For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
records to it, and rereading all records from it, such that the join input KStream
is partitioned
correctly on its key.
VT
- the value type of the tableVR
- the value type of the result streamtable
- the KTable
to be joined with this streamjoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsjoined
- a Joined
instance that defines the serdes to
be used to serialize/deserialize inputs and outputs of the joined streamsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordjoin(KTable, ValueJoiner, Joined)
,
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<GK,GV,RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner)
GlobalKTable
's records using non-windowed inner equi join.
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable
input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream
record that finds a corresponding record in GlobalKTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as the key of this KStream
.
If a KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
If keyValueMapper
returns null
implying no match exists, no output record will be added to the
resulting KStream
.
GK
- the key type of GlobalKTable
GV
- the value type of the GlobalKTable
RV
- the value type of the resulting KStream
globalTable
- the GlobalKTable
to be joined with this streamkeySelector
- instance of KeyValueMapper
used to map from the (key, value) of this stream
to the key of the GlobalKTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordleftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<GK,GV,RV> KStream<K,RV> join(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> joiner, Named named)
GlobalKTable
's records using non-windowed inner equi join.
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable
input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream
record that finds a corresponding record in GlobalKTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as the key of this KStream
.
If a KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
If keyValueMapper
returns null
implying no match exists, no output record will be added to the
resulting KStream
.
GK
- the key type of GlobalKTable
GV
- the value type of the GlobalKTable
RV
- the value type of the resulting KStream
globalTable
- the GlobalKTable
to be joined with this streamkeySelector
- instance of KeyValueMapper
used to map from the (key, value) of this stream
to the key of the GlobalKTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsnamed
- a Named
config used to name the processor in the topologyKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordleftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<GK,GV,RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner)
GlobalKTable
's records using non-windowed left equi join.
In contrast to inner-join
, all records from this stream
will produce an output record (cf. below).
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable
input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream
record whether or not it finds a corresponding record in GlobalKTable
the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as this KStream
.
If a KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
If keyValueMapper
returns null
implying no match exists, a null
value will be
provided to ValueJoiner
.
If no GlobalKTable
record was found during lookup, a null
value will be provided to
ValueJoiner
.
GK
- the key type of GlobalKTable
GV
- the value type of the GlobalKTable
RV
- the value type of the resulting KStream
globalTable
- the GlobalKTable
to be joined with this streamkeySelector
- instance of KeyValueMapper
used to map from the (key, value) of this stream
to the key of the GlobalKTable
valueJoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordjoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<GK,GV,RV> KStream<K,RV> leftJoin(GlobalKTable<GK,GV> globalTable, KeyValueMapper<? super K,? super V,? extends GK> keySelector, ValueJoiner<? super V,? super GV,? extends RV> valueJoiner, Named named)
GlobalKTable
's records using non-windowed left equi join.
In contrast to inner-join
, all records from this stream
will produce an output record (cf. below).
The join is a primary key table lookup join with join attribute
keyValueMapper.map(stream.keyValue) == table.key
.
"Table lookup join" means, that results are only computed if KStream
records are processed.
This is done by performing a lookup for matching records in the current internal GlobalKTable
state.
In contrast, processing GlobalKTable
input records will only update the internal GlobalKTable
state and will not produce any result records.
For each KStream
record whether or not it finds a corresponding record in GlobalKTable
the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as this KStream
.
If a KStream
input record key or value is null
the record will not be included in the join
operation and thus no output record will be added to the resulting KStream
.
If keyValueMapper
returns null
implying no match exists, a null
value will be
provided to ValueJoiner
.
If no GlobalKTable
record was found during lookup, a null
value will be provided to
ValueJoiner
.
GK
- the key type of GlobalKTable
GV
- the value type of the GlobalKTable
RV
- the value type of the resulting KStream
globalTable
- the GlobalKTable
to be joined with this streamkeySelector
- instance of KeyValueMapper
used to map from the (key, value) of this stream
to the key of the GlobalKTable
valueJoiner
- a ValueJoiner
that computes the join result for a pair of matching recordsnamed
- a Named
config used to name the processor in the topologyKStream
that contains join-records for each key and values computed by the given
ValueJoiner
, one output for each input KStream
recordjoin(GlobalKTable, KeyValueMapper, ValueJoiner)
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)
Transformer
(provided by the given TransformerSupplier
) is applied to each input record and
returns zero or one output record.
Thus, an input record <K,V>
can be transformed into an output record <K':V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. map()
).
If you choose not to attach one, this operation is similar to the stateless map()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator#punctuate()
,
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transform(new TransformerSupplier() {
public Transformer get() {
return new MyTransformer();
}
}, "myTransformState");
The second strategy is for the given TransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyTransformerSupplier implements TransformerSupplier {
// supply transformer
Transformer get() {
return new MyTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transform(new MyTransformerSupplier());
With either strategy, within the Transformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The Transformer
must return a KeyValue
type in transform()
.
The return value of Transformer#transform()
may be null
,
in which case no record is emitted.
class MyTransformer implements Transformer {
private ProcessorContext context;
private StateStore state;
void init(ProcessorContext context) {
this.context = context;
this.state = context.getStateStore("myTransformState");
// punctuate each second; can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
KeyValue transform(K key, V value) {
// can access this.state
return new KeyValue(key, value); // can emit a single value via return -- can also be null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before transform()
.
Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
.
(cf. transformValues()
)
Note that it is possible to emit multiple records for each input record by using
context#forward()
in
Transformer#transform()
and
Punctuator#punctuate()
.
Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
detected at runtime.
To ensure type-safety at compile-time, context#forward()
should
not be used in Transformer#transform()
and
Punctuator#punctuate()
.
If in Transformer#transform()
multiple records need to be emitted
for each input record, it is recommended to use flatTransform()
.
K1
- the key type of the new streamV1
- the value type of the new streamtransformerSupplier
- an instance of TransformerSupplier
that generates a Transformer
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with new key and value (possibly of different type)map(KeyValueMapper)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
process(ProcessorSupplier, String...)
<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, Named named, String... stateStoreNames)
Transformer
(provided by the given TransformerSupplier
) is applied to each input record and
returns zero or one output record.
Thus, an input record <K,V>
can be transformed into an output record <K':V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. map()
).
If you choose not to attach one, this operation is similar to the stateless map()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator#punctuate()
,
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transform(new TransformerSupplier() {
public Transformer get() {
return new MyTransformer();
}
}, "myTransformState");
The second strategy is for the given TransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyTransformerSupplier implements TransformerSupplier {
// supply transformer
Transformer get() {
return new MyTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transform(new MyTransformerSupplier());
With either strategy, within the Transformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The Transformer
must return a KeyValue
type in transform()
.
The return value of Transformer#transform()
may be null
,
in which case no record is emitted.
class MyTransformer implements Transformer {
private ProcessorContext context;
private StateStore state;
void init(ProcessorContext context) {
this.context = context;
this.state = context.getStateStore("myTransformState");
// punctuate each second; can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
KeyValue transform(K key, V value) {
// can access this.state
return new KeyValue(key, value); // can emit a single value via return -- can also be null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before transform()
.
Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
.
(cf. transformValues()
)
Note that it is possible to emit multiple records for each input record by using
context#forward()
in
Transformer#transform()
and
Punctuator#punctuate()
.
Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
detected at runtime.
To ensure type-safety at compile-time, context#forward()
should
not be used in Transformer#transform()
and
Punctuator#punctuate()
.
If in Transformer#transform()
multiple records need to be emitted
for each input record, it is recommended to use flatTransform()
.
K1
- the key type of the new streamV1
- the value type of the new streamtransformerSupplier
- an instance of TransformerSupplier
that generates a Transformer
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with new key and value (possibly of different type)map(KeyValueMapper)
,
flatTransform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
process(ProcessorSupplier, String...)
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, String... stateStoreNames)
Transformer
(provided by the given TransformerSupplier
) is applied to each input record and
returns zero or more output records.
Thus, an input record <K,V>
can be transformed into output records <K':V'>, <K'':V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. flatMap()
).
If you choose not to attach one, this operation is similar to the stateless flatMap()
but allows access to the ProcessorContext
and record metadata.
Furthermore, via Punctuator#punctuate()
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transform(new TransformerSupplier() {
public Transformer get() {
return new MyTransformer();
}
}, "myTransformState");
The second strategy is for the given TransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyTransformerSupplier implements TransformerSupplier {
// supply transformer
Transformer get() {
return new MyTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());
With either strategy, within the Transformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The Transformer
must return an Iterable
type (e.g., any Collection
type) in transform()
.
The return value of Transformer#transform()
may be null
,
which is equal to returning an empty Iterable
, i.e., no records are emitted.
class MyTransformer implements Transformer {
private ProcessorContext context;
private StateStore state;
void init(ProcessorContext context) {
this.context = context;
this.state = context.getStateStore("myTransformState");
// punctuate each second; can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<KeyValue> transform(K key, V value) {
// can access this.state
List<KeyValue> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new KeyValue(key, value));
}
return result; // emits a list of key-value pairs via return
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransform()
.
Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
.
(cf. transformValues()
)
Note that it is possible to emit records by using context#forward()
in Transformer#transform()
and
Punctuator#punctuate()
.
Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
detected at runtime.
To ensure type-safety at compile-time, context#forward()
should
not be used in Transformer#transform()
and
Punctuator#punctuate()
.
K1
- the key type of the new streamV1
- the value type of the new streamtransformerSupplier
- an instance of TransformerSupplier
that generates a Transformer
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with new key and value (possibly of different type)flatMap(KeyValueMapper)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
process(ProcessorSupplier, String...)
<K1,V1> KStream<K1,V1> flatTransform(TransformerSupplier<? super K,? super V,Iterable<KeyValue<K1,V1>>> transformerSupplier, Named named, String... stateStoreNames)
Transformer
(provided by the given TransformerSupplier
) is applied to each input record and
returns zero or more output records.
Thus, an input record <K,V>
can be transformed into output records <K':V'>, <K'':V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. flatMap()
).
If you choose not to attach one, this operation is similar to the stateless flatMap()
but allows access to the ProcessorContext
and record metadata.
Furthermore, via Punctuator#punctuate()
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transform(new TransformerSupplier() {
public Transformer get() {
return new MyTransformer();
}
}, "myTransformState");
The second strategy is for the given TransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyTransformerSupplier implements TransformerSupplier {
// supply transformer
Transformer get() {
return new MyTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransform(new MyTransformerSupplier());
With either strategy, within the Transformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The Transformer
must return an Iterable
type (e.g., any Collection
type) in transform()
.
The return value of Transformer#transform()
may be null
,
which is equal to returning an empty Iterable
, i.e., no records are emitted.
class MyTransformer implements Transformer {
private ProcessorContext context;
private StateStore state;
void init(ProcessorContext context) {
this.context = context;
this.state = context.getStateStore("myTransformState");
// punctuate each second; can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<KeyValue> transform(K key, V value) {
// can access this.state
List<KeyValue> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new KeyValue(key, value));
}
return result; // emits a list of key-value pairs via return
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransform()
.
Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
or join) is applied to the result KStream
.
(cf. transformValues()
)
Note that it is possible to emit records by using context#forward()
in Transformer#transform()
and
Punctuator#punctuate()
.
Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
detected at runtime.
To ensure type-safety at compile-time, context#forward()
should
not be used in Transformer#transform()
and
Punctuator#punctuate()
.
K1
- the key type of the new streamV1
- the value type of the new streamtransformerSupplier
- an instance of TransformerSupplier
that generates a Transformer
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with new key and value (possibly of different type)flatMap(KeyValueMapper)
,
transform(TransformerSupplier, String...)
,
transformValues(ValueTransformerSupplier, String...)
,
transformValues(ValueTransformerWithKeySupplier, String...)
,
process(ProcessorSupplier, String...)
<VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, String... stateStoreNames)
ValueTransformer
(provided by the given ValueTransformerSupplier
) is applied to each input
record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
public ValueTransformer get() {
return new MyValueTransformer();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerSupplier implements ValueTransformerSupplier {
// supply transformer
ValueTransformer get() {
return new MyValueTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
With either strategy, within the ValueTransformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformer
must return the new value in transform()
.
In contrast to transform()
, no additional KeyValue
pairs can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformer
tries to
emit a KeyValue
pair.
class MyValueTransformer implements ValueTransformer {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
NewValueType transform(V value) {
// can access this.state
return new NewValueType(); // or null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
transformValues()
.
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. transform(TransformerSupplier, String...)
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerSupplier
that generates a
ValueTransformer
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains records with unmodified key and new values (possibly of different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
<VR> KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames)
ValueTransformer
(provided by the given ValueTransformerSupplier
) is applied to each input
record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
public ValueTransformer get() {
return new MyValueTransformer();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerSupplier implements ValueTransformerSupplier {
// supply transformer
ValueTransformer get() {
return new MyValueTransformer();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
With either strategy, within the ValueTransformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformer
must return the new value in transform()
.
In contrast to transform()
, no additional KeyValue
pairs can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformer
tries to
emit a KeyValue
pair.
class MyValueTransformer implements ValueTransformer {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
NewValueType transform(V value) {
// can access this.state
return new NewValueType(); // or null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
transformValues()
.
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. transform(TransformerSupplier, String...)
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerSupplier
that generates a
ValueTransformer
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains records with unmodified key and new values (possibly of different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
<VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier, String... stateStoreNames)
ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to
each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
public ValueTransformer get() {
return new MyValueTransformer();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerWithKeySupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
With either strategy, within the ValueTransformerWithKey
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformerWithKey
must return the new value in
transform()
.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformerWithKey
tries
to emit a KeyValue
pair.
class MyValueTransformerWithKey implements ValueTransformerWithKey {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
NewValueType transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
return new NewValueType(readOnlyKey); // or null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
transformValues()
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. transform(TransformerSupplier, String...)
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains records with unmodified key and new values (possibly of different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
<VR> KStream<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> valueTransformerSupplier, Named named, String... stateStoreNames)
ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to
each input record value and computes a new value for it.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
public ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerWithKeySupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
With either strategy, within the ValueTransformerWithKey
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformerWithKey
must return the new value in
transform()
.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformerWithKey
tries
to emit a KeyValue
pair.
class MyValueTransformerWithKey implements ValueTransformerWithKey {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
NewValueType transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
return new NewValueType(readOnlyKey); // or null
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
transformValues()
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. transform(TransformerSupplier, String...)
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains records with unmodified key and new values (possibly of different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
<VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier, String... stateStoreNames)
ValueTransformer
(provided by the given ValueTransformerSupplier
) is applied to each input
record value and computes zero or more new values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator#punctuate()
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
public ValueTransformer get() {
return new MyValueTransformer();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerSupplier implements ValueTransformerSupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
With either strategy, within the ValueTransformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformer
must return an Iterable
type (e.g., any
Collection
type) in transform()
.
If the return value of ValueTransformer#transform()
is an empty
Iterable
or null
, no records are emitted.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformer
tries to
emit a KeyValue
pair.
class MyValueTransformer implements ValueTransformer {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<NewValueType> transform(V value) {
// can access this.state
List<NewValueType> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new NewValueType(value));
}
return result; // values
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransformValues()
.
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatTransform()
)
VR
- the value type of the result streamvalueTransformerSupplier
- an instance of ValueTransformerSupplier
that generates a
ValueTransformer
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with unmodified key and new values (possibly of
different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
<VR> KStream<K,VR> flatTransformValues(ValueTransformerSupplier<? super V,Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames)
ValueTransformer
(provided by the given ValueTransformerSupplier
) is applied to each input
record value and computes zero or more new values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. mapValues()
).
If you choose not to attach one, this operation is similar to the stateless mapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator#punctuate()
the processing progress can be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
public ValueTransformer get() {
return new MyValueTransformer();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerSupplier implements ValueTransformerSupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
With either strategy, within the ValueTransformer
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformer
must return an Iterable
type (e.g., any
Collection
type) in transform()
.
If the return value of ValueTransformer#transform()
is an empty
Iterable
or null
, no records are emitted.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformer
tries to
emit a KeyValue
pair.
class MyValueTransformer implements ValueTransformer {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<NewValueType> transform(V value) {
// can access this.state
List<NewValueType> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new NewValueType(value));
}
return result; // values
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransformValues()
.
Setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatTransform()
)
VR
- the value type of the result streamvalueTransformerSupplier
- an instance of ValueTransformerSupplier
that generates a
ValueTransformer
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with unmodified key and new values (possibly of
different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
<VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier, String... stateStoreNames)
ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to
each input record value and computes zero or more new values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. flatMapValues()
).
If you choose not to attach one, this operation is similar to the stateless flatMapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress can
be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
public ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
With either strategy, within the ValueTransformerWithKey
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformerWithKey
must return an Iterable
type (e.g., any
Collection
type) in transform()
.
If the return value of ValueTransformerWithKey#transform()
is an empty Iterable
or null
, no records are emitted.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformerWithKey
tries
to emit a KeyValue
pair.
class MyValueTransformerWithKey implements ValueTransformerWithKey {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<NewValueType> transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
List<NewValueType> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new NewValueType(readOnlyKey));
}
return result; // values
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransformValues()
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatTransform()
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with unmodified key and new values (possibly of
different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
<VR> KStream<K,VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K,? super V,Iterable<VR>> valueTransformerSupplier, Named named, String... stateStoreNames)
ValueTransformerWithKey
(provided by the given ValueTransformerWithKeySupplier
) is applied to
each input record value and computes zero or more new values.
Thus, an input record <K,V>
can be transformed into output records <K:V'>, <K:V''>, ...
.
Attaching a state store makes this a stateful record-by-record operation (cf. flatMapValues()
).
If you choose not to attach one, this operation is similar to the stateless flatMapValues()
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress can
be observed and additional periodic actions can be performed.
In order for the transformer to use state stores, the stores must be added to the topology and connected to the transformer using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the transformer.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
public ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
}, "myValueTransformState");
The second strategy is for the given ValueTransformerSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the transformer.
class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
// supply transformer
ValueTransformerWithKey get() {
return new MyValueTransformerWithKey();
}
// provide store(s) that will be added and connected to the associated transformer
// the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
With either strategy, within the ValueTransformerWithKey
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
The ValueTransformerWithKey
must return an Iterable
type (e.g., any
Collection
type) in transform()
.
If the return value of ValueTransformerWithKey#transform()
is an empty Iterable
or null
, no records are emitted.
In contrast to transform()
and
flatTransform()
, no additional KeyValue
pairs
can be emitted via ProcessorContext.forward()
.
A StreamsException
is thrown if the ValueTransformerWithKey
tries
to emit a KeyValue
pair.
class MyValueTransformerWithKey implements ValueTransformerWithKey {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myValueTransformState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
Iterable<NewValueType> transform(K readOnlyKey, V value) {
// can access this.state and use read-only key
List<NewValueType> result = new ArrayList<>();
for (int i = 0; i < 3; i++) {
result.add(new NewValueType(readOnlyKey));
}
return result; // values
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before
flatTransformValues()
.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
So, setting a new value preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
is applied to the result KStream
. (cf. flatTransform()
)
VR
- the value type of the result streamvalueTransformerSupplier
- a instance of ValueTransformerWithKeySupplier
that generates a
ValueTransformerWithKey
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
KStream
that contains more or less records with unmodified key and new values (possibly of
different type)mapValues(ValueMapper)
,
mapValues(ValueMapperWithKey)
,
transform(TransformerSupplier, String...)
,
flatTransform(TransformerSupplier, String...)
void process(ProcessorSupplier<? super K,? super V> processorSupplier, String... stateStoreNames)
Processor
(provided by the given
ProcessorSupplier
).
Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)
).
If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction)
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
Note that this is a terminal operation that returns void.
In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the processor.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.processor(new ProcessorSupplier() {
public Processor get() {
return new MyProcessor();
}
}, "myProcessorState");
The second strategy is for the given ProcessorSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the processor.
class MyProcessorSupplier implements ProcessorSupplier {
// supply processor
Processor get() {
return new MyProcessor();
}
// provide store(s) that will be added and connected to the associated processor
// the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.process(new MyProcessorSupplier());
With either strategy, within the Processor
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
class MyProcessor implements Processor {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myProcessorState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
void process(K key, V value) {
// can access this.state
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before process()
.processorSupplier
- a instance of ProcessorSupplier
that generates a Processor
stateStoreNames
- the names of the state stores used by the processor; not required if the supplier
implements ConnectedStoreProvider.stores()
foreach(ForeachAction)
,
transform(TransformerSupplier, String...)
void process(ProcessorSupplier<? super K,? super V> processorSupplier, Named named, String... stateStoreNames)
Processor
(provided by the given
ProcessorSupplier
).
Attaching a state store makes this a stateful record-by-record operation (cf. foreach(ForeachAction)
).
If you choose not to attach one, this operation is similar to the stateless foreach(ForeachAction)
but allows access to the ProcessorContext
and record metadata.
This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
Furthermore, via Punctuator.punctuate(long)
the processing progress
can be observed and additional periodic actions can be performed.
Note that this is a terminal operation that returns void.
In order for the processor to use state stores, the stores must be added to the topology and connected to the processor using at least one of two strategies (though it's not required to connect global state stores; read-only access to global state stores is available by default).
The first strategy is to manually add the StoreBuilder
s via Topology.addStateStore(StoreBuilder, String...)
,
and specify the store names via stateStoreNames
so they will be connected to the processor.
// create store
StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
Serdes.String(),
Serdes.String());
// add store
builder.addStateStore(keyValueStoreBuilder);
KStream outputStream = inputStream.processor(new ProcessorSupplier() {
public Processor get() {
return new MyProcessor();
}
}, "myProcessorState");
The second strategy is for the given ProcessorSupplier
to implement ConnectedStoreProvider.stores()
,
which provides the StoreBuilder
s to be automatically added to the topology and connected to the processor.
class MyProcessorSupplier implements ProcessorSupplier {
// supply processor
Processor get() {
return new MyProcessor();
}
// provide store(s) that will be added and connected to the associated processor
// the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
Set<StoreBuilder> stores() {
StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
Serdes.String(),
Serdes.String());
return Collections.singleton(keyValueStoreBuilder);
}
}
...
KStream outputStream = inputStream.process(new MyProcessorSupplier());
With either strategy, within the Processor
, the state is obtained via the ProcessorContext
.
To trigger periodic actions via punctuate()
,
a schedule must be registered.
class MyProcessor implements Processor {
private StateStore state;
void init(ProcessorContext context) {
this.state = context.getStateStore("myProcessorState");
// punctuate each second, can access this.state
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
}
void process(K key, V value) {
// can access this.state
}
void close() {
// can access this.state
}
}
Even if any upstream operation was key-changing, no auto-repartition is triggered.
If repartitioning is required, a call to repartition()
should be performed before process()
.processorSupplier
- a instance of ProcessorSupplier
that generates a Processor
named
- a Named
config used to name the processor in the topologystateStoreNames
- the names of the state store used by the processorforeach(ForeachAction)
,
transform(TransformerSupplier, String...)