Interface KTable<K,V>
- Type Parameters:
K
- Type of primary keysV
- Type of value changes
KTable
is an abstraction of a changelog stream from a primary-keyed table.
Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
A KTable
is either defined from a single Kafka topic
that is
consumed message by message or the result of a KTable
transformation.
An aggregation of a KStream
also yields a KTable
.
A KTable
can be transformed record by record, joined with another KTable
or KStream
, or
can be re-partitioned and aggregated into a new KTable
.
Some KTable
s have an internal state (a ReadOnlyKeyValueStore
) and are therefore queryable via the
interactive queries API.
For example:
final KTable table = ...
...
final KafkaStreams streams = ...;
streams.start()
...
final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
final StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> view = streams.store(storeQueryParams);
view.get(key);
Records from the source topic that have null keys are dropped.
-
Method Summary
Modifier and TypeMethodDescriptionCreate a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store.filter
(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store.filter
(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store.filterNot
(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store.filterNot
(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<KR,
VR> KGroupedTable<KR, VR> groupBy
(KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector) Re-groups the records of thisKTable
using the providedKeyValueMapper
and default serializers and deserializers.<KR,
VR> KGroupedTable<KR, VR> Re-groups the records of thisKTable
using the providedKeyValueMapper
andSerde
s as specified byGrouped
.join
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store.join
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store.join
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Join records of thisKTable
with anotherKTable
using non-windowed inner join.join
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed inner join.join
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) Deprecated.since 3.1, removal planned for 4.0.join
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Deprecated.since 3.1, removal planned for 4.0.join
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) Join records of thisKTable
with anotherKTable
using non-windowed inner join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.join
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed inner join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.leftJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.leftJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.leftJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Join records of thisKTable
with anotherKTable
using non-windowed left join.leftJoin
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed left join.leftJoin
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) Deprecated.since 3.1, removal planned for 4.0.leftJoin
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Deprecated.since 3.1, removal planned for 4.0.leftJoin
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) Join records of thisKTable
with anotherKTable
using non-windowed left join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.leftJoin
(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed left join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.mapValues
(ValueMapper<? super V, ? extends VR> mapper) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.mapValues
(ValueMapper<? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.mapValues
(ValueMapper<? super V, ? extends VR> mapper, Named named) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.mapValues
(ValueMapper<? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.mapValues
(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.outerJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.outerJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.outerJoin
(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.Get the name of the local state store used that can be used to query thisKTable
.suppress
(Suppressed<? super K> suppressed) Suppress some updates from this changelog stream, determined by the suppliedSuppressed
configuration.toStream()
Convert this changelog stream to aKStream
.toStream
(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.toStream
(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Named named) Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.Convert this changelog stream to aKStream
.transformValues
(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store.transformValues
(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.transformValues
(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized, Named named, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.transformValues
(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Named named, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store.
-
Method Details
-
filter
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate
- a filterPredicate
that is applied to each record- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
-
filter
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded. -
filter
KTable<K,V> filter(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Parameters:
predicate
- a filterPredicate
that is applied to each recordmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
-
filter
KTable<K,V> filter(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
-
filterNot
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate
- a filterPredicate
that is applied to each record- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
-
filterNot
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded. -
filterNot
KTable<K,V> filterNot(Predicate<? super K, ? super V> predicate, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Parameters:
predicate
- a filterPredicate
that is applied to each recordmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
-
filterNot
KTable<K,V> filterNot(Predicate<? super K, ? super V> predicate, Named named, Materialized<K, V, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
-
mapValues
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length);
This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output value- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));
This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuenamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output value- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuenamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuematerialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuenamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams.store(StoreQueryParameters)
KafkaStreams#store(...)}: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuematerialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.metadataForAllStreamsClients()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuenamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
toStream
Convert this changelog stream to aKStream
.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.
KStream
vsKTable
).- Returns:
- a
KStream
that contains the same records as thisKTable
-
toStream
-
toStream
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KStream<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });
KStream
.This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.Note that
toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStream
vsKTable
).- Type Parameters:
KR
- the new key type of the result stream- Parameters:
mapper
- aKeyValueMapper
that computes a new key for each record- Returns:
- a
KStream
that contains the same records as thisKTable
-
toStream
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });
KStream
.This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.Note that
toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStream
vsKTable
).- Type Parameters:
KR
- the new key type of the result stream- Parameters:
mapper
- aKeyValueMapper
that computes a new key for each recordnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KStream
that contains the same records as thisKTable
-
suppress
Suppress some updates from this changelog stream, determined by the suppliedSuppressed
configuration. This controls what updates downstream table and stream operations will receive.Note that
suppress()
cannot be applied toversioned KTables
.- Parameters:
suppressed
- Configuration object determining what, if any, updates to suppress- Returns:
- A new KTable with the desired suppression characteristics.
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is similar tomapValues(ValueMapperWithKey)
, but more flexible, allowing access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
,KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.stateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Named named, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is similar tomapValues(ValueMapperWithKey)
, but more flexible, allowing access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
,KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.named
- aNamed
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. This is similar tomapValues(ValueMapperWithKey)
, but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed. The resultingKTable
is materialized into another state store (additional to the provided state store names) as specified by the user viaMaterialized
parameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized
- an instance ofMaterialized
used to describe how the state store of the resulting table should be materialized. Cannot benull
stateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized, Named named, String... stateStoreNames) Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. This is similar tomapValues(ValueMapperWithKey)
, but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed. The resultingKTable
is materialized into another state store (additional to the provided state store names) as specified by the user viaMaterialized
parameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized
- an instance ofMaterialized
used to describe how the state store of the resulting table should be materialized. Cannot benull
named
- aNamed
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
-
groupBy
Re-groups the records of thisKTable
using the providedKeyValueMapper
and default serializers and deserializers. EachKeyValue
pair of thisKTable
is mapped to a newKeyValue
pair by applying the providedKeyValueMapper
. Re-grouping aKTable
is required before an aggregation operator can be applied to the data (cf.KGroupedTable
). TheKeyValueMapper
selects a new key and value (with should both have unmodified type). If the new record key isnull
the record will not be included in the resultingKGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names viaTopology.describe()
.All data of this
KTable
will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTable
is partitioned on the new key.If the key or value type is changed, it is recommended to use
groupBy(KeyValueMapper, Grouped)
instead.- Type Parameters:
KR
- the key type of the resultKGroupedTable
VR
- the value type of the resultKGroupedTable
- Parameters:
selector
- aKeyValueMapper
that computes a new grouping key and value to be aggregated- Returns:
- a
KGroupedTable
that contains the re-grouped records of the originalKTable
-
groupBy
<KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Grouped<KR, VR> grouped) Re-groups the records of thisKTable
using the providedKeyValueMapper
andSerde
s as specified byGrouped
. EachKeyValue
pair of thisKTable
is mapped to a newKeyValue
pair by applying the providedKeyValueMapper
. Re-grouping aKTable
is required before an aggregation operator can be applied to the data (cf.KGroupedTable
). TheKeyValueMapper
selects a new key and value (where both could the same type or a new type). If the new record key isnull
the record will not be included in the resultingKGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is either provided viaGrouped.as(String)
or an internally generated name.You can retrieve all generated internal topic names via
Topology.describe()
.All data of this
KTable
will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTable
is partitioned on the new key.- Type Parameters:
KR
- the key type of the resultKGroupedTable
VR
- the value type of the resultKGroupedTable
- Parameters:
selector
- aKeyValueMapper
that computes a new grouping key and value to be aggregatedgrouped
- theGrouped
instance used to specifySerdes
and the name for a repartition topic if repartitioning is required.- Returns:
- a
KGroupedTable
that contains the re-grouped records of the originalKTable
-
join
<VO,VR> KTable<K,VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
-
join
<VO,VR> KTable<K,VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
-
join
<VO,VR> KTable<K,VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
-
join
<VO,VR> KTable<K,VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
-
join
<VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner) Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
@Deprecated <VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) Deprecated.since 3.1, removal planned for 4.0. Usejoin(KTable, Function, ValueJoiner, TableJoined)
instead.Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
<VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) Join records of thisKTable
with anotherKTable
using non-windowed inner join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordstableJoined
- aTableJoined
used to configure partitioners and names of internal topics and stores- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
<VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
@Deprecated <VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Deprecated.since 3.1, removal planned for 4.0. Usejoin(KTable, Function, ValueJoiner, TableJoined, Materialized)
instead.Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
<VR,KO, KTable<K,VO> VR> join(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed inner join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordstableJoined
- aTableJoined
used to configure partitioners and names of internal topics and storesmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
<VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner) Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains only those records that satisfy the given predicate
-
leftJoin
@Deprecated <VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named) Deprecated.since 3.1, removal planned for 4.0. UseleftJoin(KTable, Function, ValueJoiner, TableJoined)
instead.Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
<VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined) Join records of thisKTable
with anotherKTable
using non-windowed left join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching recordstableJoined
- aTableJoined
used to configure partitioners and names of internal topics and stores- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
<VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
@Deprecated <VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, Named named, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Deprecated.since 3.1, removal planned for 4.0. UseleftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized)
instead.Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
<VR,KO, KTable<K,VO> VR> leftJoin(KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner, TableJoined<K, KO> tableJoined, Materialized<K, VR, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>> materialized) Join records of thisKTable
with anotherKTable
using non-windowed left join, using theTableJoined
instance for optional configurations includingpartitioners
when the tables being joined use non-default partitioning, and also the base name for components of the join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the extract is null, then the right hand side of the result will be null.joiner
- aValueJoiner
that computes the join result for a pair of matching recordstableJoined
- aTableJoined
used to configure partitioners and names of internal topics and storesmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
queryableStoreName
String queryableStoreName()Get the name of the local state store used that can be used to query thisKTable
.- Returns:
- the underlying state store name, or
null
if thisKTable
cannot be queried.
-