K
- Type of primary keysV
- Type of value changes@InterfaceStability.Unstable
public interface KTable<K,V>
KTable
is an abstraction of a changelog stream from a primary-keyed table.
Each record in this stream is an update on the primary-keyed table with the record key as the primary key.
A KTable
is either defined from one or multiple Kafka topics that are consumed message by message or
the result of a KTable
transformation. An aggregation of a KStream
also yields a KTable
.
A KTable
can be transformed record by record, joined with another KTable
or KStream
, or
can be re-partitioned and aggregated into a new KTable
.
KStream
Modifier and Type | Method and Description |
---|---|
KTable<K,V> |
filter(Predicate<K,V> predicate)
Create a new instance of
KTable that consists of all elements of this stream which satisfy a predicate. |
KTable<K,V> |
filterNot(Predicate<K,V> predicate)
Create a new instance of
KTable that consists all elements of this stream which do not satisfy a predicate. |
void |
foreach(ForeachAction<K,V> action)
Perform an action on each element of
KTable . |
String |
getStoreName()
Get the name of the local state store used for materializing this
KTable |
<K1,V1> KGroupedTable<K1,V1> |
groupBy(KeyValueMapper<K,V,KeyValue<K1,V1>> selector)
Group the records of this
KTable using the provided KeyValueMapper and default serializers and deserializers. |
<K1,V1> KGroupedTable<K1,V1> |
groupBy(KeyValueMapper<K,V,KeyValue<K1,V1>> selector,
Serde<K1> keySerde,
Serde<V1> valueSerde)
Group the records of this
KTable using the provided KeyValueMapper . |
<V1,R> KTable<K,R> |
join(KTable<K,V1> other,
ValueJoiner<V,V1,R> joiner)
Combine values of this stream with another
KTable stream's elements of the same key using Inner Join. |
<V1,R> KTable<K,R> |
leftJoin(KTable<K,V1> other,
ValueJoiner<V,V1,R> joiner)
Combine values of this stream with another
KTable stream's elements of the same key using Left Join. |
<V1> KTable<K,V1> |
mapValues(ValueMapper<V,V1> mapper)
Create a new instance of
KTable by transforming the value of each element in this stream into a new value in the new stream. |
<V1,R> KTable<K,R> |
outerJoin(KTable<K,V1> other,
ValueJoiner<V,V1,R> joiner)
Combine values of this stream with another
KTable stream's elements of the same key using Outer Join. |
void |
print()
Print the elements of this stream to
System.out . |
void |
print(Serde<K> keySerde,
Serde<V> valSerde)
Print the elements of this stream to
System.out |
void |
print(Serde<K> keySerde,
Serde<V> valSerde,
String streamName)
Print the elements of this stream to System.out
|
void |
print(String streamName)
Print the elements of this stream to
System.out . |
KTable<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K,V> partitioner,
String topic,
String storeName)
Materialize this stream to a topic, also creates a new instance of
KTable from the topic
using a customizable StreamPartitioner to determine the distribution of records to partitions. |
KTable<K,V> |
through(Serde<K> keySerde,
Serde<V> valSerde,
String topic,
String storeName)
Materialize this stream to a topic, also creates a new instance of
KTable from the topic. |
KTable<K,V> |
through(StreamPartitioner<K,V> partitioner,
String topic,
String storeName)
Materialize this stream to a topic, also creates a new instance of
KTable from the topic using default serializers
and deserializers and a customizable StreamPartitioner to determine the distribution of records to partitions. |
KTable<K,V> |
through(String topic,
String storeName)
Materialize this stream to a topic, also creates a new instance of
KTable from the topic
using default serializers and deserializers and producer's DefaultPartitioner . |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic using a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(Serde<K> keySerde,
Serde<V> valSerde,
String topic)
Materialize this stream to a topic.
|
void |
to(StreamPartitioner<K,V> partitioner,
String topic)
Materialize this stream to a topic using default serializers specified in the config
and a customizable
StreamPartitioner to determine the distribution of records to partitions. |
void |
to(String topic)
Materialize this stream to a topic using default serializers specified in the config
and producer's
DefaultPartitioner . |
KStream<K,V> |
toStream()
Convert this stream to a new instance of
KStream . |
<K1> KStream<K1,V> |
toStream(KeyValueMapper<K,V,K1> mapper)
Convert this stream to a new instance of
KStream using the given KeyValueMapper to select
the new key. |
void |
writeAsText(String filePath)
Write the elements of this stream to a file at the given path using default serializers and deserializers.
|
void |
writeAsText(String filePath,
Serde<K> keySerde,
Serde<V> valSerde)
Write the elements of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
String streamName)
Write the elements of this stream to a file at the given path.
|
void |
writeAsText(String filePath,
String streamName,
Serde<K> keySerde,
Serde<V> valSerde) |
KTable<K,V> filter(Predicate<K,V> predicate)
KTable
that consists of all elements of this stream which satisfy a predicate.KTable<K,V> filterNot(Predicate<K,V> predicate)
KTable
that consists all elements of this stream which do not satisfy a predicate.<V1> KTable<K,V1> mapValues(ValueMapper<V,V1> mapper)
KTable
by transforming the value of each element in this stream into a new value in the new stream.V1
- the value type of the new streammapper
- the instance of ValueMapper
KTable
that contains records with unmodified keys and new values of different typevoid print()
System.out
. This function
will use the generated name of the parent processor node to label the key/value pairs
printed out to the console.
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(String streamName)
System.out
. This function
will use the given name to label the key/value printed out to the console.streamName
- the name used to label the key/value pairs printed out to the console
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde)
System.out
keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedstreamName
- the name used to label the key/value pairs printed out to the console
Implementors will need to override toString for keys and values that are not of
type String, Integer etc to get meaningful information.void writeAsText(String filePath)
void writeAsText(String filePath, String streamName)
void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
filePath
- name of file to write tokeySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString
for keys and values that are not of
type String
, Integer
etc. to get meaningful information.void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde)
filePath
- name of file to write tostreamName
- the name used to label the key/value pairs printed out to the consolekeySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be used
Implementors will need to override toString
for keys and values that are not of
type String
, Integer
etc. to get meaningful information.KTable<K,V> through(String topic, String storeName)
KTable
from the topic
using default serializers and deserializers and producer's DefaultPartitioner
.
This is equivalent to calling to(String)
and KStreamBuilder.table(String, String)
.
The resulting KTable
will be materialized in a local state
store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
will be automatically created in Kafka for failure recovery, where "applicationID"
is specified by the user in StreamsConfig
.KTable<K,V> through(StreamPartitioner<K,V> partitioner, String topic, String storeName)
KTable
from the topic using default serializers
and deserializers and a customizable StreamPartitioner
to determine the distribution of records to partitions.
This is equivalent to calling to(String)
and KStreamBuilder.table(String, String)
.
The resulting KTable
will be materialized in a local state
store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
will be automatically created in Kafka for failure recovery, where "applicationID"
is specified by the user in StreamsConfig
.partitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner
will be usedtopic
- the topic namestoreName
- the state store name used for this KTableKTable
that contains the exact same records as this KTable
KTable<K,V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, String storeName)
KTable
from the topic.
If keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
is used
— otherwise producer's DefaultPartitioner
is used.
This is equivalent to calling to(Serde, Serde, String)
and
KStreamBuilder.table(Serde, Serde, String, String)
.
The resulting KTable
will be materialized in a local state
store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
will be automatically created in Kafka for failure recovery, where "applicationID"
is specified by the user in StreamsConfig
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic namestoreName
- the state store name used for this KTableKTable
that contains the exact same records as this KTable
KTable<K,V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic, String storeName)
KTable
from the topic
using a customizable StreamPartitioner
to determine the distribution of records to partitions.
This is equivalent to calling to(Serde, Serde, StreamPartitioner, String)
and
KStreamBuilder.table(Serde, Serde, String, String)
.
The resulting KTable
will be materialized in a local state
store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
will be automatically created in Kafka for failure recovery, where "applicationID"
is specified by the user in StreamsConfig
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
will be used
— otherwise DefaultPartitioner
will be usedtopic
- the topic namestoreName
- the state store name used for this KTableKTable
that contains the exact same records as this KTable
void to(String topic)
DefaultPartitioner
.topic
- the topic namevoid to(StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner
to determine the distribution of records to partitions.partitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified producer's DefaultPartitioner
will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, String topic)
keySerde
provides a
org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
is used
— otherwise producer's DefaultPartitioner
is used.keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedtopic
- the topic namevoid to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K,V> partitioner, String topic)
StreamPartitioner
to determine the distribution of records to partitions.keySerde
- key serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedvalSerde
- value serde used to send key-value pairs,
if not specified the default serde defined in the configs will be usedpartitioner
- the function used to determine how records are distributed among partitions of the topic,
if not specified and keySerde
provides a org.apache.kafka.streams.kstream.internals.WindowedSerializer
for the key
org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
will be used
— otherwise DefaultPartitioner
will be usedtopic
- the topic name<K1> KStream<K1,V> toStream(KeyValueMapper<K,V,K1> mapper)
KStream
using the given KeyValueMapper
to select
the new key.K1
- the new key typemapper
- @param mapper the instance of KeyValueMapper
KStream
that contains the transformed records from this KTable
;
the records are no longer treated as updates on a primary-keyed table,
but rather as normal key-value pairs in a record stream<V1,R> KTable<K,R> join(KTable<K,V1> other, ValueJoiner<V,V1,R> joiner)
KTable
stream's elements of the same key using Inner Join.V1
- the value type of the other streamR
- the value type of the new streamother
- the instance of KTable
joined with this streamjoiner
- the instance of ValueJoiner
KTable
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key<V1,R> KTable<K,R> outerJoin(KTable<K,V1> other, ValueJoiner<V,V1,R> joiner)
KTable
stream's elements of the same key using Outer Join.V1
- the value type of the other streamR
- the value type of the new streamother
- the instance of KTable
joined with this streamjoiner
- the instance of ValueJoiner
KTable
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key<V1,R> KTable<K,R> leftJoin(KTable<K,V1> other, ValueJoiner<V,V1,R> joiner)
KTable
stream's elements of the same key using Left Join.V1
- the value type of the other streamR
- the value type of the new streamother
- the instance of KTable
joined with this streamjoiner
- the instance of ValueJoiner
KTable
that contains join-records for each key and values computed by the given ValueJoiner
,
one for each matched record-pair with the same key<K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<K,V,KeyValue<K1,V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde)
KTable
using the provided KeyValueMapper
.K1
- the key type of the KGroupedTable
V1
- the value type of the KGroupedTable
selector
- select the grouping key and value to be aggregatedkeySerde
- key serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedvalueSerde
- value serdes for materializing this stream,
if not specified the default serdes defined in the configs will be usedKGroupedTable
that contains the re-partitioned records of this KTable
<K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<K,V,KeyValue<K1,V1>> selector)
KTable
using the provided KeyValueMapper
and default serializers and deserializers.K1
- the key type of the KGroupedTable
V1
- the value type of the KGroupedTable
selector
- select the grouping key and value to be aggregatedKGroupedTable
that contains the re-partitioned records of this KTable
void foreach(ForeachAction<K,V> action)
KTable
.
Note that this is a terminal operation that returns void.action
- an action to perform on each element