Interface KTable<K,V>

Type Parameters:
K - Type of primary keys
V - Type of value changes

public interface KTable<K,V>
KTable is an abstraction of a changelog stream from a primary-keyed table. Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.

A KTable is either defined from a single Kafka topic that is consumed message by message or the result of a KTable transformation. An aggregation of a KStream also yields a KTable.

A KTable can be transformed record by record, joined with another KTable or KStream, or can be re-partitioned and aggregated into a new KTable.

Some KTables have an internal state (a ReadOnlyKeyValueStore) and are therefore queryable via the interactive queries API. For example:


     final KTable table = ...
     ...
     final KafkaStreams streams = ...;
     streams.start()
     ...
     final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
     view.get(key);

Records from the source topic that have null keys are dropped.

See Also:
  • Method Details

    • filter

      KTable<K,V> filter(Predicate<? super K,? super V> predicate)
      Create a new KTable that consists of all records of this KTable which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filter for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      a KTable that contains only those records that satisfy the given predicate
      See Also:
    • filter

      KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
      Create a new KTable that consists of all records of this KTable which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filter for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains only those records that satisfy the given predicate
      See Also:
    • filter

      KTable<K,V> filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable that consists of all records of this KTable which satisfy the given predicate, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. All records that do not satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filter for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):

      
       KafkaStreams streams = ... // filtering words
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-word";
       ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
       
      For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Parameters:
      predicate - a filter Predicate that is applied to each record
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains only those records that satisfy the given predicate
      See Also:
    • filter

      KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable that consists of all records of this KTable which satisfy the given predicate, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. All records that do not satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filter for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):

      
       KafkaStreams streams = ... // filtering words
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-word";
       ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
       
      For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains only those records that satisfy the given predicate
      See Also:
    • filterNot

      KTable<K,V> filterNot(Predicate<? super K,? super V> predicate)
      Create a new KTable that consists all records of this KTable which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filterNot for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.

      Parameters:
      predicate - a filter Predicate that is applied to each record
      Returns:
      a KTable that contains only those records that do not satisfy the given predicate
      See Also:
    • filterNot

      KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
      Create a new KTable that consists all records of this KTable which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filterNot for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.

      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains only those records that do not satisfy the given predicate
      See Also:
    • filterNot

      KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable that consists all records of this KTable which do not satisfy the given predicate, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. All records that do satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filterNot for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.

      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):

      
       KafkaStreams streams = ... // filtering words
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-word";
       ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
       
      For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Parameters:
      predicate - a filter Predicate that is applied to each record
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains only those records that do not satisfy the given predicate
      See Also:
    • filterNot

      KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable that consists all records of this KTable which do not satisfy the given predicate, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. All records that do satisfy the predicate are dropped. For each KTable update, the filter is evaluated based on the current update record and then an update record is produced for the result KTable. This is a stateless record-by-record operation.

      Note that filterNot for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.

      To query the local ReadOnlyKeyValueStore it must be obtained via KafkaStreams#store(...):

      
       KafkaStreams streams = ... // filtering words
       ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
       K key = "some-word";
       ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
       
      For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Parameters:
      predicate - a filter Predicate that is applied to each record
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains only those records that do not satisfy the given predicate
      See Also:
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. For each KTable update the provided ValueMapper is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of the value string.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length);
       

      This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapper that computes a new output value
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. For each KTable update the provided ValueMapper is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of the value string.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));
       

      This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapper that computes a new output value
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. For each KTable update the provided ValueMapperWithKey is applied to the value of the update record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of value and key strings.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable =
        inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with default serializers, deserializers, and state store. For each KTable update the provided ValueMapperWithKey is applied to the value of the update record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of value and key strings.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable =
        inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. For each KTable update the provided ValueMapper is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of the value string.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
           Integer apply(String value) {
               return value.split(" ").length;
           }
       });
       

      To query the local KeyValueStore representing outputTable above it must be obtained via KafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapper that computes a new output value
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. For each KTable update the provided ValueMapper is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of the value string.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
           Integer apply(String value) {
               return value.split(" ").length;
           }
       });
       

      To query the local KeyValueStore representing outputTable above it must be obtained via KafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapper that computes a new output value
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. For each KTable update the provided ValueMapperWithKey is applied to the value of the update record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of value and key strings.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
           Integer apply(String readOnlyKey, String value) {
                return readOnlyKey.split(" ").length + value.split(" ").length;
           }
       });
       

      To query the local KeyValueStore representing outputTable above it must be obtained via KafkaStreams.store(StoreQueryParameters) KafkaStreams#store(...)}: For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • mapValues

      <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type) in the new KTable, with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. For each KTable update the provided ValueMapperWithKey is applied to the value of the update record and computes a new value for it, resulting in an updated record for the result KTable. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is a stateless record-by-record operation.

      The example below counts the number of token of value and key strings.

      
       KTable<String, String> inputTable = builder.table("topic");
       KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> {
           Integer apply(String readOnlyKey, String value) {
                return readOnlyKey.split(" ").length + value.split(" ").length;
           }
       });
       

      To query the local KeyValueStore representing outputTable above it must be obtained via KafkaStreams#store(...): For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients() to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified by Materialized.as(String) or Materialized.as(KeyValueBytesStoreSupplier).

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result KTable.

      Note that mapValues for a changelog stream works differently than record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the result KTable.

      Type Parameters:
      VR - the value type of the result KTable
      Parameters:
      mapper - a ValueMapperWithKey that computes a new output value
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains records with unmodified keys and new values (possibly of different type)
    • toStream

      KStream<K,V> toStream()
      Convert this changelog stream to a KStream.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf. KStream vs KTable).

      Returns:
      a KStream that contains the same records as this KTable
    • toStream

      KStream<K,V> toStream(Named named)
      Convert this changelog stream to a KStream.

      Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf. KStream vs KTable).

      Parameters:
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains the same records as this KTable
    • toStream

      <KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper)
      Convert this changelog stream to a KStream using the given KeyValueMapper to select the new key.

      For example, you can compute the new key as the length of the value string.

      
       KTable<String, String> table = builder.table("topic");
       KStream<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
           Integer apply(String key, String value) {
               return value.length();
           }
       });
       
      Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream.

      This operation is equivalent to calling table.toStream().selectKey(KeyValueMapper).

      Note that toStream() is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf. KStream vs KTable).

      Type Parameters:
      KR - the new key type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new key for each record
      Returns:
      a KStream that contains the same records as this KTable
    • toStream

      <KR> KStream<KR,V> toStream(KeyValueMapper<? super K,? super V,? extends KR> mapper, Named named)
      Convert this changelog stream to a KStream using the given KeyValueMapper to select the new key.

      For example, you can compute the new key as the length of the value string.

      
       KTable<String, String> table = builder.table("topic");
       KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
           Integer apply(String key, String value) {
               return value.length();
           }
       });
       
      Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the result KStream.

      This operation is equivalent to calling table.toStream().selectKey(KeyValueMapper).

      Note that toStream() is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf. KStream vs KTable).

      Type Parameters:
      KR - the new key type of the result stream
      Parameters:
      mapper - a KeyValueMapper that computes a new key for each record
      named - a Named config used to name the processor in the topology
      Returns:
      a KStream that contains the same records as this KTable
    • suppress

      KTable<K,V> suppress(Suppressed<? super K> suppressed)
      Suppress some updates from this changelog stream, determined by the supplied Suppressed configuration. This controls what updates downstream table and stream operations will receive.

      Note that suppress() cannot be applied to versioned KTables.

      Parameters:
      suppressed - Configuration object determining what, if any, updates to suppress
      Returns:
      A new KTable with the desired suppression characteristics.
    • transformValues

      <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, String... stateStoreNames)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type), with default serializers, deserializers, and state store. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is similar to mapValues(ValueMapperWithKey), but more flexible, allowing access to additional state-stores, and access to the ProcessorContext. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      If the downstream topology uses aggregation functions, (e.g. KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf. transformValues(ValueTransformerWithKeySupplier, Materialized, String...)), such concerns are handled for you.

      In order to assign a state, the state must be created and registered beforehand:

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // register store
       builder.addStateStore(keyValueStoreBuilder);
      
       KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
       

      Within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       new ValueTransformerWithKeySupplier() {
           ValueTransformerWithKey get() {
               return new ValueTransformerWithKey() {
                   private KeyValueStore<String, String> state;
      
                   void init(ProcessorContext context) {
                       this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
                       context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
                   }
      
                   NewValueType transform(K readOnlyKey, V value) {
                       // can access this.state and use read-only key
                       return new NewValueType(readOnlyKey); // or null
                   }
      
                   void close() {
                       // can access this.state
                   }
               }
           }
       }
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.

      Type Parameters:
      VR - the value type of the result table
      Parameters:
      transformerSupplier - a instance of ValueTransformerWithKeySupplier that generates a ValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.
      stateStoreNames - the names of the state stores used by the processor
      Returns:
      a KTable that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Named named, String... stateStoreNames)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type), with default serializers, deserializers, and state store. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. Thus, an input record <K,V> can be transformed into an output record <K:V'>. This is similar to mapValues(ValueMapperWithKey), but more flexible, allowing access to additional state-stores, and access to the ProcessorContext. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed.

      If the downstream topology uses aggregation functions, (e.g. KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>), etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf. transformValues(ValueTransformerWithKeySupplier, Materialized, String...)), such concerns are handled for you.

      In order to assign a state, the state must be created and registered beforehand:

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // register store
       builder.addStateStore(keyValueStoreBuilder);
      
       KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
       

      Within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       new ValueTransformerWithKeySupplier() {
           ValueTransformerWithKey get() {
               return new ValueTransformerWithKey() {
                   private KeyValueStore<String, String> state;
      
                   void init(ProcessorContext context) {
                       this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
                       context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
                   }
      
                   NewValueType transform(K readOnlyKey, V value) {
                       // can access this.state and use read-only key
                       return new NewValueType(readOnlyKey); // or null
                   }
      
                   void close() {
                       // can access this.state
                   }
               }
           }
       }
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.

      Type Parameters:
      VR - the value type of the result table
      Parameters:
      transformerSupplier - a instance of ValueTransformerWithKeySupplier that generates a ValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor
      Returns:
      a KTable that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, String... stateStoreNames)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type), with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. This is similar to mapValues(ValueMapperWithKey), but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to the ProcessorContext. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. The resulting KTable is materialized into another state store (additional to the provided state store names) as specified by the user via Materialized parameter, and is queryable through its given name.

      In order to assign a state, the state must be created and registered beforehand:

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // register store
       builder.addStateStore(keyValueStoreBuilder);
      
       KTable outputTable = inputTable.transformValues(
           new ValueTransformerWithKeySupplier() { ... },
           Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
                                       .withKeySerde(Serdes.String())
                                       .withValueSerde(Serdes.String()),
           "myValueTransformState");
       

      Within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       new ValueTransformerWithKeySupplier() {
           ValueTransformerWithKey get() {
               return new ValueTransformerWithKey() {
                   private KeyValueStore<String, String> state;
      
                   void init(ProcessorContext context) {
                       this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
                       context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
                   }
      
                   NewValueType transform(K readOnlyKey, V value) {
                       // can access this.state and use read-only key
                       return new NewValueType(readOnlyKey); // or null
                   }
      
                   void close() {
                       // can access this.state
                   }
               }
           }
       }
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.

      Type Parameters:
      VR - the value type of the result table
      Parameters:
      transformerSupplier - a instance of ValueTransformerWithKeySupplier that generates a ValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.
      materialized - an instance of Materialized used to describe how the state store of the resulting table should be materialized. Cannot be null
      stateStoreNames - the names of the state stores used by the processor
      Returns:
      a KTable that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • transformValues

      <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, Named named, String... stateStoreNames)
      Create a new KTable by transforming the value of each record in this KTable into a new value (with possibly a new type), with the key serde, value serde, and the underlying materialized state storage configured in the Materialized instance. A ValueTransformerWithKey (provided by the given ValueTransformerWithKeySupplier) is applied to each input record value and computes a new value for it. This is similar to mapValues(ValueMapperWithKey), but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to the ProcessorContext. Furthermore, via Punctuator.punctuate(long) the processing progress can be observed and additional periodic actions can be performed. The resulting KTable is materialized into another state store (additional to the provided state store names) as specified by the user via Materialized parameter, and is queryable through its given name.

      In order to assign a state, the state must be created and registered beforehand:

      
       // create store
       StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
               Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
                       Serdes.String(),
                       Serdes.String());
       // register store
       builder.addStateStore(keyValueStoreBuilder);
      
       KTable outputTable = inputTable.transformValues(
           new ValueTransformerWithKeySupplier() { ... },
           Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
                                       .withKeySerde(Serdes.String())
                                       .withValueSerde(Serdes.String()),
           "myValueTransformState");
       

      Within the ValueTransformerWithKey, the state is obtained via the ProcessorContext. To trigger periodic actions via punctuate(), a schedule must be registered.

      
       new ValueTransformerWithKeySupplier() {
           ValueTransformerWithKey get() {
               return new ValueTransformerWithKey() {
                   private KeyValueStore<String, String> state;
      
                   void init(ProcessorContext context) {
                       this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
                       context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
                   }
      
                   NewValueType transform(K readOnlyKey, V value) {
                       // can access this.state and use read-only key
                       return new NewValueType(readOnlyKey); // or null
                   }
      
                   void close() {
                       // can access this.state
                   }
               }
           }
       }
       

      Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.

      Type Parameters:
      VR - the value type of the result table
      Parameters:
      transformerSupplier - a instance of ValueTransformerWithKeySupplier that generates a ValueTransformerWithKey. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.
      materialized - an instance of Materialized used to describe how the state store of the resulting table should be materialized. Cannot be null
      named - a Named config used to name the processor in the topology
      stateStoreNames - the names of the state stores used by the processor
      Returns:
      a KTable that contains records with unmodified key and new values (possibly of different type)
      See Also:
    • groupBy

      <KR, VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector)
      Re-groups the records of this KTable using the provided KeyValueMapper and default serializers and deserializers. Each KeyValue pair of this KTable is mapped to a new KeyValue pair by applying the provided KeyValueMapper. Re-grouping a KTable is required before an aggregation operator can be applied to the data (cf. KGroupedTable). The KeyValueMapper selects a new key and value (with should both have unmodified type). If the new record key is null the record will not be included in the resulting KGroupedTable

      Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe().

      All data of this KTable will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resulting KGroupedTable is partitioned on the new key.

      If the key or value type is changed, it is recommended to use groupBy(KeyValueMapper, Grouped) instead.

      Type Parameters:
      KR - the key type of the result KGroupedTable
      VR - the value type of the result KGroupedTable
      Parameters:
      selector - a KeyValueMapper that computes a new grouping key and value to be aggregated
      Returns:
      a KGroupedTable that contains the re-grouped records of the original KTable
    • groupBy

      <KR, VR> KGroupedTable<KR,VR> groupBy(KeyValueMapper<? super K,? super V,KeyValue<KR,VR>> selector, Grouped<KR,VR> grouped)
      Re-groups the records of this KTable using the provided KeyValueMapper and Serdes as specified by Grouped. Each KeyValue pair of this KTable is mapped to a new KeyValue pair by applying the provided KeyValueMapper. Re-grouping a KTable is required before an aggregation operator can be applied to the data (cf. KGroupedTable). The KeyValueMapper selects a new key and value (where both could the same type or a new type). If the new record key is null the record will not be included in the resulting KGroupedTable

      Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "<name>" is either provided via Grouped.as(String) or an internally generated name.

      You can retrieve all generated internal topic names via Topology.describe().

      All data of this KTable will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resulting KGroupedTable is partitioned on the new key.

      Type Parameters:
      KR - the key type of the result KGroupedTable
      VR - the value type of the result KGroupedTable
      Parameters:
      selector - a KeyValueMapper that computes a new grouping key and value to be aggregated
      grouped - the Grouped instance used to specify Serdes and the name for a repartition topic if repartitioning is required.
      Returns:
      a KGroupedTable that contains the re-grouped records of the original KTable
    • join

      <VO, VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
      Join records of this KTable with another KTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      <K1:C> <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • join

      <VO, VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
      Join records of this KTable with another KTable's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      <K1:C> <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • join

      <VO, VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable's records using non-windowed inner equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      <K1:C> <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • join

      <VO, VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable's records using non-windowed inner equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)>
      <K1:C> <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key
      See Also:
    • leftJoin

      <VO, VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join, all records from left KTable will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of left KTable that does not find a corresponding record in the right KTable's state the provided ValueJoiner will be called with rightValue = null to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:null>
      <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of left KTable
      See Also:
    • leftJoin

      <VO, VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join, all records from left KTable will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of left KTable that does not find a corresponding record in the right KTable's state the provided ValueJoiner will be called with rightValue = null to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:null>
      <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of left KTable
      See Also:
    • leftJoin

      <VO, VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed left equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join, all records from left KTable will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of left KTable that does not find a corresponding record in the right KTable's state the provided ValueJoiner will be called with rightValue = null to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:null>
      <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of left KTable
      See Also:
    • leftJoin

      <VO, VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed left equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join, all records from left KTable will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of left KTable that does not find a corresponding record in the right KTable's state the provided ValueJoiner will be called with rightValue = null to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:null>
      <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of left KTable
      See Also:
    • outerJoin

      <VO, VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join or left-join, all records from both input KTables will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding other KTable's state the provided ValueJoiner will be called with null value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:ValueJoiner(null,b)>
      <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KTables
      See Also:
    • outerJoin

      <VO, VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join or left-join, all records from both input KTables will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding other KTable's state the provided ValueJoiner will be called with null value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:ValueJoiner(null,b)>
      <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KTables
      See Also:
    • outerJoin

      <VO, VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed outer equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join or left-join, all records from both input KTables will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding other KTable's state the provided ValueJoiner will be called with null value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:ValueJoiner(null,b)>
      <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KTables
      See Also:
    • outerJoin

      <VO, VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable (left input) with another KTable's (right input) records using non-windowed outer equi join, with the Materialized instance for configuration of the key serde, the result table's value serde, and state store. The join is a primary key join with join attribute thisKTable.key == otherKTable.key. In contrast to inner-join or left-join, all records from both input KTables will produce an output record (cf. below). The result is an ever updating KTable that represents the current (i.e., processing time) result of the join.

      The join is computed by (1) updating the internal state of one KTable and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the other KTable. This happens in a symmetric way, i.e., for each update of either this or the other input KTable the result gets updated.

      For each KTable record that finds a corresponding record in the other KTable's state the provided ValueJoiner will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding other KTable's state the provided ValueJoiner will be called with null value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.

      Note that records with null values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the result KTable if required (i.e., if there is anything to be deleted).

      Input records with null key will be dropped and no join computation is performed.

      Example:

      thisKTable thisState otherKTable otherState result updated record
      <K1:A> <K1:A> <K1:ValueJoiner(A,null)>
      <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)>
      <K1:null> <K1:b> <K1:ValueJoiner(null,b)>
      <K1:null> <K1:null>
      Both input streams (or to be more precise, their underlying source topics) need to have the same number of partitions.
      Type Parameters:
      VO - the value type of the other KTable
      VR - the value type of the result KTable
      Parameters:
      other - the other KTable to be joined with this KTable
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      materialized - an instance of Materialized used to describe how the state store should be materialized. Cannot be null
      Returns:
      a KTable that contains join-records for each key and values computed by the given ValueJoiner, one for each matched record-pair with the same key plus one for each non-matching record of both KTables
      See Also:
    • join

      <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
      Join records of this KTable with another KTable using non-windowed inner join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KTable that contains the result of joining this table with other
    • join

      @Deprecated <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
      Deprecated.
      since 3.1, removal planned for 4.0. Use join(KTable, Function, ValueJoiner, TableJoined) instead.
      Join records of this KTable with another KTable using non-windowed inner join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains the result of joining this table with other
    • join

      <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, TableJoined<K,KO> tableJoined)
      Join records of this KTable with another KTable using non-windowed inner join, using the TableJoined instance for optional configurations including partitioners when the tables being joined use non-default partitioning, and also the base name for components of the join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      tableJoined - a TableJoined used to configure partitioners and names of internal topics and stores
      Returns:
      a KTable that contains the result of joining this table with other
    • join

      <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable using non-windowed inner join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • join

      @Deprecated <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Deprecated.
      since 3.1, removal planned for 4.0. Use join(KTable, Function, ValueJoiner, TableJoined, Materialized) instead.
      Join records of this KTable with another KTable using non-windowed inner join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • join

      <VR, KO, VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, TableJoined<K,KO> tableJoined, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable using non-windowed inner join, using the TableJoined instance for optional configurations including partitioners when the tables being joined use non-default partitioning, and also the base name for components of the join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      tableJoined - a TableJoined used to configure partitioners and names of internal topics and stores
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • leftJoin

      <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
      Join records of this KTable with another KTable using non-windowed left join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      Returns:
      a KTable that contains only those records that satisfy the given predicate
    • leftJoin

      @Deprecated <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
      Deprecated.
      since 3.1, removal planned for 4.0. Use leftJoin(KTable, Function, ValueJoiner, TableJoined) instead.
      Join records of this KTable with another KTable using non-windowed left join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      Returns:
      a KTable that contains the result of joining this table with other
    • leftJoin

      <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, TableJoined<K,KO> tableJoined)
      Join records of this KTable with another KTable using non-windowed left join, using the TableJoined instance for optional configurations including partitioners when the tables being joined use non-default partitioning, and also the base name for components of the join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      tableJoined - a TableJoined used to configure partitioners and names of internal topics and stores
      Returns:
      a KTable that contains the result of joining this table with other
    • leftJoin

      <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable using non-windowed left join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • leftJoin

      @Deprecated <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Deprecated.
      since 3.1, removal planned for 4.0. Use leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized) instead.
      Join records of this KTable with another KTable using non-windowed left join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      named - a Named config used to name the processor in the topology
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • leftJoin

      <VR, KO, VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, TableJoined<K,KO> tableJoined, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
      Join records of this KTable with another KTable using non-windowed left join, using the TableJoined instance for optional configurations including partitioners when the tables being joined use non-default partitioning, and also the base name for components of the join.

      This is a foreign key join, where the joining key is determined by the foreignKeyExtractor.

      Type Parameters:
      VR - the value type of the result KTable
      KO - the key type of the other KTable
      VO - the value type of the other KTable
      Parameters:
      other - the other KTable to be joined with this KTable. Keyed by KO.
      foreignKeyExtractor - a Function that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.
      joiner - a ValueJoiner that computes the join result for a pair of matching records
      tableJoined - a TableJoined used to configure partitioners and names of internal topics and stores
      materialized - a Materialized that describes how the StateStore for the resulting KTable should be materialized. Cannot be null
      Returns:
      a KTable that contains the result of joining this table with other
    • queryableStoreName

      String queryableStoreName()
      Get the name of the local state store used that can be used to query this KTable.
      Returns:
      the underlying state store name, or null if this KTable cannot be queried.