K
- Type of primary keysV
- Type of value changes@InterfaceStability.Unstable
public interface KTable<K,V>
KTable
is an abstraction of a changelog stream from a primary-keyed table.
Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
A KTable
is either defined from a single Kafka topic
that is
consumed message by message or the result of a KTable
transformation.
An aggregation of a KStream
also yields a KTable
.
A KTable
can be transformed record by record, joined with another KTable
or KStream
, or
can be re-partitioned and aggregated into a new KTable
.
Some KTable
s have an internal state (a ReadOnlyKeyValueStore
) and are therefore queryable via the
interactive queries API.
For example:
final KTable table = ...
...
final KafkaStreams streams = ...;
streams.start()
...
final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable
ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
view.get(key);
Modifier and Type | Method and Description |
---|---|
KTable<K,V> |
filter(Predicate<? super K,? super V> predicate)
Create a new
KTable that consists of all records of this KTable which satisfy the given
predicate. |
KTable<K,V> |
filterNot(Predicate<? super K,? super V> predicate)
Create a new
KTable that consists all records of this KTable which do not satisfy the
given predicate. |
void |
foreach(ForeachAction<? super K,? super V> action)
Perform an action on each update record of this
KTable . |
String |
getStoreName()
Get the name of the local state store used for materializing this
KTable . |
<KR,VR> KGroupedTable<KR,VR> |
groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector)
Re-groups the records of this
KTable using the provided KeyValueMapper and default serializers
and deserializers. |
<KR,VR> KGroupedTable<KR,VR> |
groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector,
Serde<KR> keySerde,
Serde<VR> valueSerde)
Re-groups the records of this
KTable using the provided KeyValueMapper . |
<VO,VR> KTable<K,VR> |
join(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of this
KTable with another KTable 's records using non-windowed inner equi join. |
<VO,VR> KTable<K,VR> |
leftJoin(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed left equi join. |
<VR> KTable<K,VR> |
mapValues(ValueMapper<? super V,? extends VR> mapper)
Create a new
KTable by transforming the value of each record in this KTable into a new value
(with possible new type)in the new KTable . |
<VO,VR> KTable<K,VR> |
outerJoin(KTable<K,VO> other,
ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of this
KTable (left input) with another KTable 's (right input) records using
non-windowed outer equi join. |
void |
print()
Print the update records of this
KTable to System.out . |
void |
print(Serde<K> keySerde,
Serde<V> valSerde)
Print the update records of this
KTable to System.out . |
void |
print(Serde<K> keySerde,
Serde<V> valSerde,
String streamName)
Print the update records of this
KTable to System.out . |
void |
print(String streamName)
Print the update records of this
KTable to System.out . |
KTable<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<? super K,? super V> partitioner,
String topic,
String storeName)
Materialize this changelog stream to a topic and creates a new
KTable from the topic using a customizable
StreamPartitioner to determine the distribution of records to partitions. |
KTable<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
String topic,
String storeName)
Materialize this changelog stream to a topic and creates a new
KTable from the topic. |
KTable<K,V> |
through(StreamPartitioner<? super K,? super V> partitioner,
String topic,
String storeName)
Materialize this changelog stream to a topic and creates a new
KTable from the topic using default
serializers and deserializers and a customizable StreamPartitioner to determine the distribution of
records to partitions. |
KTable<K,V> |
through(String topic,
String storeName)
Materialize this changelog stream to a topic and creates a new
KTable from the topic using default
serializers and deserializers and producer's DefaultPartitioner . |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this changelog stream to a topic using a customizable
StreamPartitioner to determine the
distribution of records to partitions. |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this changelog stream to a topic.
|
void |
to(StreamPartitioner<? super K,? super V> partitioner,
String topic)
Materialize this changelog stream to a topic using default serializers and deserializers and a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(String topic)
Materialize this changelog stream to a topic using default serializers and deserializers and producer's
DefaultPartitioner . |
KStream<K,V> |
toStream()
Convert this changelog stream to a
KStream . |
<KR> KStream<KR,V> |
toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Convert this changelog stream to a
KStream using the given KeyValueMapper to select the new key. |
void |
writeAsText(String filePath)
Write the update records of this
KTable to a file at the given path. |
void |
writeAsText(String filePath,
Serde<K> keySerde,
Serde<V> valSerde)
Write the update records of this
KTable to a file at the given path. |
void |
writeAsText(String filePath,
String streamName)
Write the update records of this
KTable to a file at the given path. |
void |
writeAsText(String filePath,
String streamName,
Serde<K> keySerde,
Serde<V> valSerde)
Write the update records of this
KTable to a file at the given path. |
KTable<K,V> filter(Predicate<? super K,? super V> predicate)
KTable
that consists of all records of this KTable
which satisfy the given
predicate.
All records that do not satisfy the predicate are dropped.
For each KTable
update the filter is evaluated on the update record to produce an update record for the
result KTable
.
This is a stateless record-by-record operation.
Note that filter
for a changelog stream works different to record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
is forwarded.
predicate
- a filter Predicate
that is applied to each recordKTable
that contains only those records that satisfy the given predicatefilterNot(Predicate)
KTable<K,V> filterNot(Predicate<? super K,? super V> predicate)
KTable
that consists all records of this KTable
which do not satisfy the
given predicate.
All records that do satisfy the predicate are dropped.
For each KTable
update the filter is evaluated on the update record to produce an update record for the
result KTable
.
This is a stateless record-by-record operation.
Note that filterNot
for a changelog stream works different to record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
directly if required (i.e., if there is anything to be deleted).
Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
forwarded.
predicate
- a filter Predicate
that is applied to each recordKTable
that contains only those records that do not satisfy the given predicatefilter(Predicate)
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
KTable
by transforming the value of each record in this KTable
into a new value
(with possible new type)in the new KTable
.
For each KTable
update the provided ValueMapper
is applied to the value of the update record and
computes a new value for it, resulting in an update record for the result KTable
.
Thus, an input record <K,V>
can be transformed into an output record <K:V'>
.
This is a stateless record-by-record operation.
The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic");
KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
Integer apply(String value) {
return value.split(" ").length;
}
});
This operation preserves data co-location with respect to the key.
Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
the result KTable
.
Note that mapValues
for a changelog stream works different to record stream filters
, because records
with null
values (so-called tombstone records)
have delete semantics.
Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
delete the corresponding record in the result KTable
.
VR
- the value type of the result KTable
mapper
- a ValueMapper
that computes a new output valueKTable
that contains records with unmodified keys and new values (possibly of different type)void print()
KTable
to System.out
.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the console.
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that print()
is not applied to the internal state store and only called for each new KTable
update record.
void print(String streamName)
KTable
to System.out
.
This function will use the given name to label the key/value pairs printed to the console.
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that print()
is not applied to the internal state store and only called for each new KTable
update record.
streamName
- the name used to label the key/value pairs printed to the consolevoid print(Serde<K> keySerde, Serde<V> valSerde)
KTable
to System.out
.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the console.
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that print()
is not applied to the internal state store and only called for each new KTable
update record.
keySerde
- key serde used to deserialize key if type is byte[]
,valSerde
- value serde used to deserialize value if type is byte[]
,void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
KTable
to System.out
.
This function will use the given name to label the key/value pairs printed to the console.
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that print()
is not applied to the internal state store and only called for each new KTable
update record.
keySerde
- key serde used to deserialize key if type is byte[]
,valSerde
- value serde used to deserialize value if type is byte[]
,streamName
- the name used to label the key/value pairs printed to the consolevoid writeAsText(String filePath)
KTable
to a file at the given path.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the file.
The default serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that writeAsText()
is not applied to the internal state store and only called for each new
KTable
update record.
filePath
- name of file to write tovoid writeAsText(String filePath, String streamName)
KTable
to a file at the given path.
This function will use the given name to label the key/value printed to the file.
The default serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that writeAsText()
is not applied to the internal state store and only called for each new
KTable
update record.
filePath
- name of file to write tostreamName
- the name used to label the key/value pairs printed out to the consolevoid writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
KTable
to a file at the given path.
This function will use the generated name of the parent processor node to label the key/value pairs printed to
the file.
The provided serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that writeAsText()
is not applied to the internal state store and only called for each new
KTable
update record.
filePath
- name of file to write tokeySerde
- key serde used to deserialize key if type is byte[]
,valSerde
- value serde used to deserialize value if type is byte[]
,void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde)
KTable
to a file at the given path.
This function will use the given name to label the key/value printed to the file.
The default serde will be used to deserialize the key or value in case the type is byte[]
before calling
toString()
on the deserialized object.
Implementors will need to override toString()
for keys and values that are not of type String
,
Integer
etc. to get meaningful information.
Note that writeAsText()
is not applied to the internal state store and only called for each new
KTable
update record.
filePath
- name of file to write tostreamName
- the name used to label the key/value pairs printed to the consolekeySerde
- key serde used to deserialize key if type is byte[]
,valSerde
- value serde used to deserialize value if type is byte[]
,void foreach(ForeachAction<? super K,? super V> action)
KTable
.
Note that this is a terminal operation that returns void.
Note that foreach()
is not applied to the internal state store and only called for each new
KTable
update record.
action
- an action to perform on each recordKStream<K,V> toStream()
KStream
.
Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
this changelog stream is no longer treated as an update record (cf. KStream
vs KTable
).
KStream
that contains the same records as this KTable
<KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)
KStream
using the given KeyValueMapper
to select the new key.
For example, you can compute the new key as the length of the value string.
KTable<String, String> table = builder.table("topic");
KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
Integer apply(String key, String value) {
return value.length();
}
});
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
join) is applied to the result KStream
.
This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.
Note that toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e.,
each record of this changelog stream is no longer treated as an update record (cf. KStream
vs KTable
).
KR
- the new key type of the result streammapper
- a KeyValueMapper
that computes a new key for each recordKStream
that contains the same records as this KTable
KTable<K,V> through(String topic, String storeName)
KTable
from the topic using default
serializers and deserializers and producer's DefaultPartitioner
.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(someTopicName)
and
KStreamBuilder#table(someTopicName, queryableStoreName)
.
The resulting KTable
will be materialized in a local state store with the given store name (cf.
KStreamBuilder.table(String, String)
)
topic
- the topic namestoreName
- the state store name used for the result KTable
KTable
that contains the exact same (and potentially repartitioned) records as this KTable
KTable<K,V> through(StreamPartitioner<? super K,? super V> partitioner, String topic, String storeName)
KTable
from the topic using default
serializers and deserializers and a customizable StreamPartitioner
to determine the distribution of
records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(partitioner, someTopicName)
and
KStreamBuilder#table(someTopicName, queryableStoreName)
.
The resulting KTable
will be materialized in a local state store with the given store name (cf.
KStreamBuilder.table(String, String)
)
partitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner
will be usedtopic
- the topic namestoreName
- the state store name used for the result KTable
KTable
that contains the exact same (and potentially repartitioned) records as this KTable
KTable<K,V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, String storeName)
KTable
from the topic.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
If keySerde
provides a WindowedSerializer
for the key WindowedStreamPartitioner
is
used—otherwise producer's DefaultPartitioner
is used.
This is equivalent to calling #to(keySerde, valueSerde, someTopicName)
and
KStreamBuilder#table(someTopicName, queryableStoreName)
.
The resulting KTable
will be materialized in a local state store with the given store name (cf.
KStreamBuilder.table(String, String)
)
keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic namestoreName
- the state store name used for the result KTable
KTable
that contains the exact same (and potentially repartitioned) records as this KTable
KTable<K,V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K,? super V> partitioner, String topic, String storeName)
KTable
from the topic using a customizable
StreamPartitioner
to determine the distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).
This is equivalent to calling #to(keySerde, valueSerde, partitioner, someTopicName)
and
KStreamBuilder#table(someTopicName, queryableStoreName)
.
The resulting KTable
will be materialized in a local state store with the given store name (cf.
KStreamBuilder.table(String, String)
)
keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a WindowedSerializer
for the key
WindowedStreamPartitioner
will be used—otherwise DefaultPartitioner
will
be usedtopic
- the topic namestoreName
- the state store name used for the result KTable
KTable
that contains the exact same (and potentially repartitioned) records as this KTable
void to(String topic)
DefaultPartitioner
.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).topic
- the topic namevoid to(StreamPartitioner<? super K,? super V> partitioner, String topic)
StreamPartitioner
to determine the distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).partitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner
will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, String topic)
If keySerde
provides a WindowedSerializer
for the key WindowedStreamPartitioner
is
used—otherwise producer's DefaultPartitioner
is used.
keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K,? super V> partitioner, String topic)
StreamPartitioner
to determine the
distribution of records to partitions.
The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
started).keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a WindowedSerializer
for the key
WindowedStreamPartitioner
will be used—otherwise DefaultPartitioner
will
be usedtopic
- the topic name<KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector)
KTable
using the provided KeyValueMapper
and default serializers
and deserializers.
Each KeyValue
pair of this KTable
is mapped to a new KeyValue
pair by applying the
provided KeyValueMapper
.
Re-grouping a KTable
is required before an aggregation operator can be applied to the data
(cf. KGroupedTable
).
The KeyValueMapper
selects a new key and value (with should both have unmodified type).
If the new record key is null
the record will not be included in the resulting KGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
, "XXX" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString()
.
All data of this KTable
will be redistributed through the repartitioning topic by writing all update
records to and rereading all update records from it, such that the resulting KGroupedTable
is partitioned
on the new key.
If the key or value type is changed, it is recommended to use groupBy(KeyValueMapper, Serde, Serde)
instead.
KR
- the key type of the result KGroupedTable
VR
- the value type of the result KGroupedTable
selector
- a KeyValueMapper
that computes a new grouping key and value to be aggregatedKGroupedTable
that contains the re-grouped records of the original KTable
<KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Serde<KR> keySerde, Serde<VR> valueSerde)
KTable
using the provided KeyValueMapper
.
Each KeyValue
pair of this KTable
is mapped to a new KeyValue
pair by applying the
provided KeyValueMapper
.
Re-grouping a KTable
is required before an aggregation operator can be applied to the data
(cf. KGroupedTable
).
The KeyValueMapper
selects a new key and value (both with potentially different type).
If the new record key is null
the record will not be included in the resulting KGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka.
This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameter APPLICATION_ID_CONFIG
, "XXX" is
an internally generated name, and "-repartition" is a fixed suffix.
You can retrieve all generated internal topic names via KafkaStreams.toString()
.
All data of this KTable
will be redistributed through the repartitioning topic by writing all update
records to and rereading all update records from it, such that the resulting KGroupedTable
is partitioned
on the new key.
KR
- the key type of the result KGroupedTable
VR
- the value type of the result KGroupedTable
selector
- a KeyValueMapper
that computes a new grouping key and value to be aggregatedkeySerde
- key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalueSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedTable
that contains the re-grouped records of the original KTable
<VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
KTable
with another KTable
's records using non-windowed inner equi join.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
the provided
ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
directly to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result update record |
---|---|---|---|---|
<K1:A> | <K1:A> | |||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:C> | <K1:C> | <K1:b> | <K1:ValueJoiner(C,b)> | |
<K1:C> | <K1:null> | <K1:null> |
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same keyleftJoin(KTable, ValueJoiner)
,
outerJoin(KTable, ValueJoiner)
<VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
KTable
(left input) with another KTable
's (right input) records using
non-windowed left equi join.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
, all records from left KTable
will produce
an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record of left KTable
that does not find a corresponding record in the
right KTable
's state the provided ValueJoiner
will be called with rightValue =
null
to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
forwarded directly to delete a record in the result KTable
if required (i.e., if there is anything to be
deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result update record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:null> | ||
<K1:null> |
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
left KTable
join(KTable, ValueJoiner)
,
outerJoin(KTable, ValueJoiner)
<VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
KTable
(left input) with another KTable
's (right input) records using
non-windowed outer equi join.
The join is a primary key join with join attribute thisKTable.key == otherKTable.key
.
In contrast to inner-join
or left-join
,
all records from both input KTable
s will produce an output record (cf. below).
The result is an ever updating KTable
that represents the current (i.e., processing time) result
of the join.
The join is computed by (1) updating the internal state of one KTable
and (2) performing a lookup for a
matching record in the current (i.e., processing time) internal state of the other KTable
.
This happens in a symmetric way, i.e., for each update of either this
or the other
input
KTable
the result gets updated.
For each KTable
record that finds a corresponding record in the other KTable
's state the
provided ValueJoiner
will be called to compute a value (with arbitrary type) for the result record.
Additionally, for each record that does not find a corresponding record in the corresponding other
KTable
's state the provided ValueJoiner
will be called with null
value for the
corresponding other value to compute a value (with arbitrary type) for the result record.
The key of the result record is the same as for both joining input records.
Note that records
with null
values (so-called tombstone records) have delete semantics.
Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
to delete a record in the result KTable
if required (i.e., if there is anything to be deleted).
Input records with null
key will be dropped and no join computation is performed.
Example:
thisKTable | thisState | otherKTable | otherState | result update record |
---|---|---|---|---|
<K1:A> | <K1:A> | <K1:ValueJoiner(A,null)> | ||
<K1:A> | <K1:b> | <K1:b> | <K1:ValueJoiner(A,b)> | |
<K1:null> | <K1:b> | <K1:ValueJoiner(null,b)> | ||
<K1:null> | <K1:null> |
VO
- the value type of the other KTable
VR
- the value type of the result KTable
other
- the other KTable
to be joined with this KTable
joiner
- a ValueJoiner
that computes the join result for a pair of matching recordsKTable
that contains join-records for each key and values computed by the given
ValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of
both KTable
sjoin(KTable, ValueJoiner)
,
leftJoin(KTable, ValueJoiner)
String getStoreName()
KTable
.null
if this KTable
is not materialized