Interface KTable<K,V>
-
- Type Parameters:
K- Type of primary keysV- Type of value changes
public interface KTable<K,V>KTableis an abstraction of a changelog stream from a primary-keyed table. Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.A
KTableis eitherdefined from a single Kafka topicthat is consumed message by message or the result of aKTabletransformation. An aggregation of aKStreamalso yields aKTable.A
KTablecan be transformed record by record, joined with anotherKTableorKStream, or can be re-partitioned and aggregated into a newKTable.Some
KTables have an internal state (aReadOnlyKeyValueStore) and are therefore queryable via the interactive queries API. For example:final KTable table = ... ... final KafkaStreams streams = ...; streams.start() ... final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); view.get(key);Records from the source topic that have null keys are dropped.
- See Also:
KStream,KGroupedTable,GlobalKTable,StreamsBuilder.table(String)
-
-
Method Summary
All Methods Instance Methods Abstract Methods Deprecated Methods Modifier and Type Method Description KTable<K,V>filter(Predicate<? super K,? super V> predicate)Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.KTable<K,V>filter(Predicate<? super K,? super V> predicate, Named named)Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>filter(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.KTable<K,V>filterNot(Predicate<? super K,? super V> predicate)Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.KTable<K,V>filterNot(Predicate<? super K,? super V> predicate, Named named)Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>filterNot(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<KR,VR>
KGroupedTable<KR,VR>groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector)Re-groups the records of thisKTableusing the providedKeyValueMapperand default serializers and deserializers.<KR,VR>
KGroupedTable<KR,VR>groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Grouped<KR,VR> grouped)Re-groups the records of thisKTableusing the providedKeyValueMapperandSerdes as specified byGrouped.<KR,VR>
KGroupedTable<KR,VR>groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Serialized<KR,VR> serialized)Deprecated.since 2.1.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)Join records of thisKTablewith anotherKTableusing non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTableusing non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)Join records of thisKTablewith anotherKTableusing non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTableusing non-windowed inner join.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)Join records of thisKTablewith anotherKTableusing non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTableusing non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)Join records of thisKTablewith anotherKTableusing non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTablewith anotherKTableusing non-windowed left join.<VR> KTable<K,VR>mapValues(ValueMapper<? super V,? extends VR> mapper)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store.<VR> KTable<K,VR>mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VR> KTable<K,VR>mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store.<VR> KTable<K,VR>mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VR> KTable<K,VR>mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store.<VR> KTable<K,VR>mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VR> KTable<K,VR>mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store.<VR> KTable<K,VR>mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store.StringqueryableStoreName()Get the name of the local state store used that can be used to query thisKTable.KTable<K,V>suppress(Suppressed<? super K> suppressed)Suppress some updates from this changelog stream, determined by the suppliedSuppressedconfiguration.KStream<K,V>toStream()Convert this changelog stream to aKStream.<KR> KStream<KR,V>toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)Convert this changelog stream to aKStreamusing the givenKeyValueMapperto select the new key.<KR> KStream<KR,V>toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)Convert this changelog stream to aKStreamusing the givenKeyValueMapperto select the new key.KStream<K,V>toStream(Named named)Convert this changelog stream to aKStream.<VR> KTable<K,VR>transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, String... stateStoreNames)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with default serializers, deserializers, and state store.<VR> KTable<K,VR>transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, String... stateStoreNames)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VR> KTable<K,VR>transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, Named named, String... stateStoreNames)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance.<VR> KTable<K,VR>transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Named named, String... stateStoreNames)Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with default serializers, deserializers, and state store.
-
-
-
Method Detail
-
filter
KTable<K,V> filter(Predicate<? super K,? super V> predicate)
Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate- a filterPredicatethat is applied to each record- Returns:
- a
KTablethat contains only those records that satisfy the given predicate - See Also:
filterNot(Predicate)
-
filter
KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate- a filterPredicatethat is applied to each recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains only those records that satisfy the given predicate - See Also:
filterNot(Predicate)
-
filter
KTable<K,V> filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. All records that do not satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)KafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).- Parameters:
predicate- a filterPredicatethat is applied to each recordmaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains only those records that satisfy the given predicate - See Also:
filterNot(Predicate, Materialized)
-
filter
KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTablethat consists of all records of thisKTablewhich satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. All records that do not satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)KafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).- Parameters:
predicate- a filterPredicatethat is applied to each recordnamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains only those records that satisfy the given predicate - See Also:
filterNot(Predicate, Materialized)
-
filterNot
KTable<K,V> filterNot(Predicate<? super K,? super V> predicate)
Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterNotfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate- a filterPredicatethat is applied to each record- Returns:
- a
KTablethat contains only those records that do not satisfy the given predicate - See Also:
filter(Predicate)
-
filterNot
KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterNotfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.- Parameters:
predicate- a filterPredicatethat is applied to each recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains only those records that do not satisfy the given predicate - See Also:
filter(Predicate)
-
filterNot
KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. All records that do satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterNotfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)KafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).- Parameters:
predicate- a filterPredicatethat is applied to each recordmaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains only those records that do not satisfy the given predicate - See Also:
filter(Predicate, Materialized)
-
filterNot
KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTablethat consists all records of thisKTablewhich do not satisfy the given predicate, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. All records that do satisfy the predicate are dropped. For eachKTableupdate, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable. This is a stateless record-by-record operation.Note that
filterNotfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStoreit must be obtained viaKafkaStreams#store(...):
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)KafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).- Parameters:
predicate- a filterPredicatethat is applied to each recordnamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains only those records that do not satisfy the given predicate - See Also:
filter(Predicate, Materialized)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store. For eachKTableupdate the providedValueMapperis applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length);This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperthat computes a new output value- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store. For eachKTableupdate the providedValueMapperis applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperthat computes a new output valuenamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store. For eachKTableupdate the providedValueMapperWithKeyis applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperWithKeythat computes a new output value- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with default serializers, deserializers, and state store. For eachKTableupdate the providedValueMapperWithKeyis applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperWithKeythat computes a new output valuenamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. For eachKTableupdate the providedValueMapperis applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });To query the local
KeyValueStorerepresenting outputTable above it must be obtained viaKafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperthat computes a new output valuematerialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. For eachKTableupdate the providedValueMapperis applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });To query the local
KeyValueStorerepresenting outputTable above it must be obtained viaKafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperthat computes a new output valuenamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. For eachKTableupdate the providedValueMapperWithKeyis applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });To query the local
KeyValueStorerepresenting outputTable above it must be obtained viaKafkaStreams.store(StoreQueryParameters)KafkaStreams#store(...)}: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperWithKeythat computes a new output valuematerialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
mapValues
<VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type) in the newKTable, with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. For eachKTableupdate the providedValueMapperWithKeyis applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });To query the local
KeyValueStorerepresenting outputTable above it must be obtained viaKafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)orMaterialized.as(KeyValueBytesStoreSupplier).Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable.Note that
mapValuesfor a changelog stream works differently thanrecord stream filters, becauserecordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable.- Type Parameters:
VR- the value type of the resultKTable- Parameters:
mapper- aValueMapperWithKeythat computes a new output valuenamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains records with unmodified keys and new values (possibly of different type)
-
toStream
KStream<K,V> toStream()
Convert this changelog stream to aKStream.Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.
KStreamvsKTable).- Returns:
- a
KStreamthat contains the same records as thisKTable
-
toStream
<KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)
Convert this changelog stream to aKStreamusing the givenKeyValueMapperto select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });KStream.This operation is equivalent to calling
table.toStream().selectKey(KeyValueMapper).Note that
toStream()is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStreamvsKTable).- Type Parameters:
KR- the new key type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new key for each record- Returns:
- a
KStreamthat contains the same records as thisKTable
-
toStream
<KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
Convert this changelog stream to aKStreamusing the givenKeyValueMapperto select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });KStream.This operation is equivalent to calling
table.toStream().selectKey(KeyValueMapper).Note that
toStream()is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStreamvsKTable).- Type Parameters:
KR- the new key type of the result stream- Parameters:
mapper- aKeyValueMapperthat computes a new key for each recordnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KStreamthat contains the same records as thisKTable
-
suppress
KTable<K,V> suppress(Suppressed<? super K> suppressed)
Suppress some updates from this changelog stream, determined by the suppliedSuppressedconfiguration. This controls what updates downstream table and stream operations will receive.- Parameters:
suppressed- Configuration object determining what, if any, updates to suppress- Returns:
- A new KTable with the desired suppression characteristics.
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, String... stateStoreNames)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is similar tomapValues(ValueMapperWithKey), but more flexible, allowing access to additional state-stores, and access to theProcessorContext. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>),KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");Within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR- the value type of the result table- Parameters:
transformerSupplier- a instance ofValueTransformerWithKeySupplierthat generates aValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.stateStoreNames- the names of the state stores used by the processor- Returns:
- a
KTablethat contains records with unmodified key and new values (possibly of different type) - See Also:
mapValues(ValueMapper),mapValues(ValueMapperWithKey)
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Named named, String... stateStoreNames)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>can be transformed into an output record<K:V'>. This is similar tomapValues(ValueMapperWithKey), but more flexible, allowing access to additional state-stores, and access to theProcessorContext. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>),KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");Within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR- the value type of the result table- Parameters:
transformerSupplier- a instance ofValueTransformerWithKeySupplierthat generates aValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.named- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor- Returns:
- a
KTablethat contains records with unmodified key and new values (possibly of different type) - See Also:
mapValues(ValueMapper),mapValues(ValueMapperWithKey)
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, String... stateStoreNames)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. This is similar tomapValues(ValueMapperWithKey), but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. The resultingKTableis materialized into another state store (additional to the provided state store names) as specified by the user viaMaterializedparameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");Within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR- the value type of the result table- Parameters:
transformerSupplier- a instance ofValueTransformerWithKeySupplierthat generates aValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized- an instance ofMaterializedused to describe how the state store of the resulting table should be materialized. Cannot benullstateStoreNames- the names of the state stores used by the processor- Returns:
- a
KTablethat contains records with unmodified key and new values (possibly of different type) - See Also:
mapValues(ValueMapper),mapValues(ValueMapperWithKey)
-
transformValues
<VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, Named named, String... stateStoreNames)
Create a newKTableby transforming the value of each record in thisKTableinto a new value (with possibly a new type), with thekey serde,value serde, and the underlyingmaterialized state storageconfigured in theMaterializedinstance. AValueTransformerWithKey(provided by the givenValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. This is similar tomapValues(ValueMapperWithKey), but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext. Furthermore, viaPunctuator.punctuate(long)the processing progress can be observed and additional periodic actions can be performed. The resultingKTableis materialized into another state store (additional to the provided state store names) as specified by the user viaMaterializedparameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");Within the
ValueTransformerWithKey, the state is obtained via theProcessorContext. To trigger periodic actions viapunctuate(), a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Type Parameters:
VR- the value type of the result table- Parameters:
transformerSupplier- a instance ofValueTransformerWithKeySupplierthat generates aValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized- an instance ofMaterializedused to describe how the state store of the resulting table should be materialized. Cannot benullnamed- aNamedconfig used to name the processor in the topologystateStoreNames- the names of the state stores used by the processor- Returns:
- a
KTablethat contains records with unmodified key and new values (possibly of different type) - See Also:
mapValues(ValueMapper),mapValues(ValueMapperWithKey)
-
groupBy
<KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector)
Re-groups the records of thisKTableusing the providedKeyValueMapperand default serializers and deserializers. EachKeyValuepair of thisKTableis mapped to a newKeyValuepair by applying the providedKeyValueMapper. Re-grouping aKTableis required before an aggregation operator can be applied to the data (cf.KGroupedTable). TheKeyValueMapperselects a new key and value (with should both have unmodified type). If the new record key isnullthe record will not be included in the resultingKGroupedTableBecause a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names viaTopology.describe().All data of this
KTablewill be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTableis partitioned on the new key.If the key or value type is changed, it is recommended to use
groupBy(KeyValueMapper, Grouped)instead.- Type Parameters:
KR- the key type of the resultKGroupedTableVR- the value type of the resultKGroupedTable- Parameters:
selector- aKeyValueMapperthat computes a new grouping key and value to be aggregated- Returns:
- a
KGroupedTablethat contains the re-grouped records of the originalKTable
-
groupBy
@Deprecated <KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Serialized<KR,VR> serialized)
Deprecated.since 2.1. UsegroupBy(KeyValueMapper, Grouped)insteadRe-groups the records of thisKTableusing the providedKeyValueMapperandSerdes as specified bySerialized. EachKeyValuepair of thisKTableis mapped to a newKeyValuepair by applying the providedKeyValueMapper. Re-grouping aKTableis required before an aggregation operator can be applied to the data (cf.KGroupedTable). TheKeyValueMapperselects a new key and value (with both maybe being the same type or a new type). If the new record key isnullthe record will not be included in the resultingKGroupedTableBecause a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names viaTopology.describe().All data of this
KTablewill be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTableis partitioned on the new key.- Type Parameters:
KR- the key type of the resultKGroupedTableVR- the value type of the resultKGroupedTable- Parameters:
selector- aKeyValueMapperthat computes a new grouping key and value to be aggregatedserialized- theSerializedinstance used to specifySerdes- Returns:
- a
KGroupedTablethat contains the re-grouped records of the originalKTable
-
groupBy
<KR,VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Grouped<KR,VR> grouped)
Re-groups the records of thisKTableusing the providedKeyValueMapperandSerdes as specified byGrouped. EachKeyValuepair of thisKTableis mapped to a newKeyValuepair by applying the providedKeyValueMapper. Re-grouping aKTableis required before an aggregation operator can be applied to the data (cf.KGroupedTable). TheKeyValueMapperselects a new key and value (where both could the same type or a new type). If the new record key isnullthe record will not be included in the resultingKGroupedTableBecause a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfigvia parameterAPPLICATION_ID_CONFIG, "<name>" is either provided viaGrouped.as(String)or an internally generated name.You can retrieve all generated internal topic names via
Topology.describe().All data of this
KTablewill be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTableis partitioned on the new key.- Type Parameters:
KR- the key type of the resultKGroupedTableVR- the value type of the resultKGroupedTable- Parameters:
selector- aKeyValueMapperthat computes a new grouping key and value to be aggregatedgrouped- theGroupedinstance used to specifySerdesand the name for a repartition topic if repartitioning is required.- Returns:
- a
KGroupedTablethat contains the re-grouped records of the originalKTable
-
join
<VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
leftJoin(KTable, ValueJoiner),outerJoin(KTable, ValueJoiner)
-
join
<VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
leftJoin(KTable, ValueJoiner),outerJoin(KTable, ValueJoiner)
-
join
<VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsmaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
leftJoin(KTable, ValueJoiner, Materialized),outerJoin(KTable, ValueJoiner, Materialized)
-
join
<VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTable's records using non-windowed inner equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTablethe providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topologymaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key - See Also:
leftJoin(KTable, ValueJoiner, Materialized),outerJoin(KTable, ValueJoiner, Materialized)
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-join, all records from leftKTablewill produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTablethat does not find a corresponding record in the rightKTable's state the providedValueJoinerwill be called withrightValue = nullto compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable - See Also:
join(KTable, ValueJoiner),outerJoin(KTable, ValueJoiner)
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-join, all records from leftKTablewill produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTablethat does not find a corresponding record in the rightKTable's state the providedValueJoinerwill be called withrightValue = nullto compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable - See Also:
join(KTable, ValueJoiner),outerJoin(KTable, ValueJoiner)
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-join, all records from leftKTablewill produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTablethat does not find a corresponding record in the rightKTable's state the providedValueJoinerwill be called withrightValue = nullto compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsmaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable - See Also:
join(KTable, ValueJoiner, Materialized),outerJoin(KTable, ValueJoiner, Materialized)
-
leftJoin
<VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed left equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-join, all records from leftKTablewill produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTablethat does not find a corresponding record in the rightKTable's state the providedValueJoinerwill be called withrightValue = nullto compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topologymaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable - See Also:
join(KTable, ValueJoiner, Materialized),outerJoin(KTable, ValueJoiner, Materialized)
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-joinorleft-join, all records from both inputKTables will produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable's state the providedValueJoinerwill be called withnullvalue for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKTables - See Also:
join(KTable, ValueJoiner),leftJoin(KTable, ValueJoiner)
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-joinorleft-join, all records from both inputKTables will produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable's state the providedValueJoinerwill be called withnullvalue for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKTables - See Also:
join(KTable, ValueJoiner),leftJoin(KTable, ValueJoiner)
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-joinorleft-join, all records from both inputKTables will produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable's state the providedValueJoinerwill be called withnullvalue for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsmaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKTables - See Also:
join(KTable, ValueJoiner),leftJoin(KTable, ValueJoiner)
-
outerJoin
<VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable(left input) with anotherKTable's (right input) records using non-windowed outer equi join, with theMaterializedinstance for configuration of thekey serde,the result table's value serde, andstate store. The join is a primary key join with join attributethisKTable.key == otherKTable.key. In contrast toinner-joinorleft-join, all records from both inputKTables will produce an output record (cf. below). The result is an ever updatingKTablethat represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTableand (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable. This happens in a symmetric way, i.e., for each update of eitherthisor theotherinputKTablethe result gets updated.For each
KTablerecord that finds a corresponding record in the otherKTable's state the providedValueJoinerwill be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable's state the providedValueJoinerwill be called withnullvalue for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
recordswithnullvalues (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTableif required (i.e., if there is anything to be deleted).Input records with
nullkey will be dropped and no join computation is performed.Example:
Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Type Parameters:
VO- the value type of the otherKTableVR- the value type of the resultKTable- Parameters:
other- the otherKTableto be joined with thisKTablejoiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topologymaterialized- an instance ofMaterializedused to describe how the state store should be materialized. Cannot benull- Returns:
- a
KTablethat contains join-records for each key and values computed by the givenValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of bothKTables - See Also:
join(KTable, ValueJoiner),leftJoin(KTable, ValueJoiner)
-
join
<VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Join records of thisKTablewith anotherKTableusing non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KTablethat contains the result of joining this table withother
-
join
<VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Join records of thisKTablewith anotherKTableusing non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains the result of joining this table withother
-
join
<VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTableusing non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsmaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains the result of joining this table withother
-
join
<VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTableusing non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains the result of joining this table withother
-
leftJoin
<VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Join records of thisKTablewith anotherKTableusing non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching records- Returns:
- a
KTablethat contains only those records that satisfy the given predicate
-
leftJoin
<VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Join records of thisKTablewith anotherKTableusing non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topology- Returns:
- a
KTablethat contains the result of joining this table withother
-
leftJoin
<VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTableusing non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsmaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains the result of joining this table withother
-
leftJoin
<VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTablewith anotherKTableusing non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor.- Type Parameters:
VR- the value type of the resultKTableKO- the key type of the otherKTableVO- the value type of the otherKTable- Parameters:
other- the otherKTableto be joined with thisKTable. Keyed by KO.foreignKeyExtractor- aFunctionthat extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.joiner- aValueJoinerthat computes the join result for a pair of matching recordsnamed- aNamedconfig used to name the processor in the topologymaterialized- aMaterializedthat describes how theStateStorefor the resultingKTableshould be materialized. Cannot benull- Returns:
- a
KTablethat contains the result of joining this table withother
-
queryableStoreName
String queryableStoreName()
Get the name of the local state store used that can be used to query thisKTable.- Returns:
- the underlying state store name, or
nullif thisKTablecannot be queried.
-
-