K
- Type of keysV
- Type of valuespublic interface KGroupedTable<K,V>
KGroupedTable
is an abstraction of a re-grouped changelog stream from a primary-keyed table,
usually on a different grouping key than the original primary key.
It is an intermediate representation after a re-grouping of a KTable
before an aggregation is applied to the
new partitions resulting in a new KTable
.
A KGroupedTable
must be obtained from a KTable
via groupBy(...)
.
KTable
Modifier and Type | Method and Description |
---|---|
<VR> KTable<K,VR> |
aggregate(Initializer<VR> initializer,
Aggregator<? super K,? super V,VR> adder,
Aggregator<? super K,? super V,VR> subtractor)
|
<VR> KTable<K,VR> |
aggregate(Initializer<VR> initializer,
Aggregator<? super K,? super V,VR> adder,
Aggregator<? super K,? super V,VR> subtractor,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
<VR> KTable<K,VR> |
aggregate(Initializer<VR> initializer,
Aggregator<? super K,? super V,VR> adder,
Aggregator<? super K,? super V,VR> subtractor,
Named named)
|
<VR> KTable<K,VR> |
aggregate(Initializer<VR> initializer,
Aggregator<? super K,? super V,VR> adder,
Aggregator<? super K,? super V,VR> subtractor,
Named named,
Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
KTable<K,Long> |
count()
|
KTable<K,Long> |
count(Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
KTable<K,Long> |
count(Named named)
|
KTable<K,Long> |
count(Named named,
Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
KTable<K,V> |
reduce(Reducer<V> adder,
Reducer<V> subtractor)
|
KTable<K,V> |
reduce(Reducer<V> adder,
Reducer<V> subtractor,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
KTable<K,V> |
reduce(Reducer<V> adder,
Reducer<V> subtractor,
Named named,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
|
KTable<K,Long> count(Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to
the same key into a new instance of KTable
.
Records with null
key are ignored.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
materialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys and Long
values that
represent the latest (rolling) count (i.e., number of records) for each keyKTable<K,Long> count(Named named, Materialized<K,Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to
the same key into a new instance of KTable
.
Records with null
key are ignored.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
named
- the Named
config used to name the processor in the topologymaterialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys and Long
values that
represent the latest (rolling) count (i.e., number of records) for each keyKTable<K,Long> count()
KTable
that got mapped
to
the same key into a new instance of KTable
.
Records with null
key are ignored.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via Topology.describe()
.
KTable<K,Long> count(Named named)
KTable
that got mapped
to
the same key into a new instance of KTable
.
Records with null
key are ignored.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via Topology.describe()
.
KTable<K,V> reduce(Reducer<V> adder, Reducer<V> subtractor, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to the same key into a new instance of KTable
.
Records with null
key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. aggregate(Initializer, Aggregator, Aggregator, Materialized)
).
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (first argument) and the record's value (second argument) by adding the new record to the
aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate (first argument) and the record's value (second
argument) by "removing" the "replaced" record from the aggregate.
If there is no current aggregate the Reducer
is not applied and the new aggregate will be the record's
value as-is.
Thus, reduce(Reducer, Reducer, String)
can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
public class SumAdder implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
adder
- a Reducer
that adds a new value to the aggregate resultsubtractor
- a Reducer
that removed an old value from the aggregate resultmaterialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each keyKTable<K,V> reduce(Reducer<V> adder, Reducer<V> subtractor, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to the same key into a new instance of KTable
.
Records with null
key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. aggregate(Initializer, Aggregator, Aggregator, Materialized)
).
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (first argument) and the record's value (second argument) by adding the new record to the
aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate (first argument) and the record's value (second
argument) by "removing" the "replaced" record from the aggregate.
If there is no current aggregate the Reducer
is not applied and the new aggregate will be the record's
value as-is.
Thus, reduce(Reducer, Reducer, String)
can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
public class SumAdder implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
adder
- a Reducer
that adds a new value to the aggregate resultsubtractor
- a Reducer
that removed an old value from the aggregate resultnamed
- a Named
config used to name the processor in the topologymaterialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each keyKTable<K,V> reduce(Reducer<V> adder, Reducer<V> subtractor)
KTable
that got mapped
to the same key into a new instance of KTable
.
Records with null
key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. aggregate(Initializer, Aggregator, Aggregator)
).
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate and the record's value by adding the new record to the aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
If there is no current aggregate the Reducer
is not applied and the new aggregate will be the record's
value as-is.
Thus, reduce(Reducer, Reducer)
can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
public class SumAdder implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer<Integer> {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via Topology.describe()
.
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to the same key into a new instance of KTable
.
Records with null
key are ignored.
Aggregating is a generalization of combining via reduce(...)
as it,
for example, allows the result to have a different type than the input values.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
The specified Initializer
is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
Initializer
) and the record's value by adding the new record to the aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized)
can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
// in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
public class SumInitializer implements Initializer<Long> {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VR
- the value type of the aggregated KTable
initializer
- an Initializer
that provides an initial aggregate result valueadder
- an Aggregator
that adds a new record to the aggregate resultsubtractor
- an Aggregator
that removed an old record from the aggregate resultmaterialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
that got mapped
to the same key into a new instance of KTable
.
Records with null
key are ignored.
Aggregating is a generalization of combining via reduce(...)
as it,
for example, allows the result to have a different type than the input values.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
that can be queried using the provided queryableStoreName
.
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
The specified Initializer
is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
Initializer
) and the record's value by adding the new record to the aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized)
can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
// in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
public class SumInitializer implements Initializer<Long> {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ... // counting words
StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams);
K key = "some-word";
ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "storeName" is the
provide store name defined in Materialized
, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via Topology.describe()
.
VR
- the value type of the aggregated KTable
initializer
- an Initializer
that provides an initial aggregate result valueadder
- an Aggregator
that adds a new record to the aggregate resultsubtractor
- an Aggregator
that removed an old record from the aggregate resultnamed
- a Named
config used to name the processor in the topologymaterialized
- the instance of Materialized
used to materialize the state store. Cannot be null
KTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor)
KTable
that got mapped
to the same key into a new instance of KTable
using default serializers and deserializers.
Records with null
key are ignored.
Aggregating is a generalization of combining via reduce(...)
as it,
for example, allows the result to have a different type than the input values.
If the result value type does not match the default value
serde
you should use aggregate(Initializer, Aggregator, Aggregator, Materialized)
.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
The specified Initializer
is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
Initializer
) and the record's value by adding the new record to the aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, aggregate(Initializer, Aggregator, Aggregator, String)
can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
// in this example, LongSerde.class must be set as default value serde in StreamsConfig
public class SumInitializer implements Initializer<Long> {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via Topology.describe()
.VR
- the value type of the aggregated KTable
initializer
- a Initializer
that provides an initial aggregate result valueadder
- a Aggregator
that adds a new record to the aggregate resultsubtractor
- a Aggregator
that removed an old record from the aggregate resultKTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor, Named named)
KTable
that got mapped
to the same key into a new instance of KTable
using default serializers and deserializers.
Records with null
key are ignored.
Aggregating is a generalization of combining via reduce(...)
as it,
for example, allows the result to have a different type than the input values.
If the result value type does not match the default value
serde
you should use aggregate(Initializer, Aggregator, Aggregator, Materialized)
.
The result is written into a local KeyValueStore
(which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a KTable
changelog stream.
The specified Initializer
is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original KTable
results in a two step update of the result KTable
.
The specified adder
is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
Initializer
) and the record's value by adding the new record to the aggregate.
The specified subtractor
is applied for each "replaced" record of the original KTable
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, aggregate(Initializer, Aggregator, Aggregator, String)
can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
// in this example, LongSerde.class must be set as default value serde in StreamsConfig
public class SumInitializer implements Initializer<Long> {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator<String, Integer, Long> {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the configuration
parameters for
cache size
, and
commit interval
.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in StreamsConfig
via parameter
APPLICATION_ID_CONFIG
, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via Topology.describe()
.VR
- the value type of the aggregated KTable
initializer
- a Initializer
that provides an initial aggregate result valueadder
- a Aggregator
that adds a new record to the aggregate resultsubtractor
- a Aggregator
that removed an old record from the aggregate resultnamed
- a Named
config used to name the processor in the topologyKTable
that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key