public class KafkaStreams extends Object implements AutoCloseable
The computational logic can be specified either by using the Topology
to define a DAG topology of
Processor
s or by using the StreamsBuilder
which provides the high-level DSL to define
transformations.
One KafkaStreams
instance can contain one or more threads specified in the configs for the processing work.
A KafkaStreams
instance can co-ordinate with any other instances with the same
application ID
(whether in the same process, on other processes on this
machine, or on remote machines) as a single (possibly distributed) stream processing application.
These instances will divide up the work based on the assignment of the input topic partitions so that all partitions
are being consumed.
If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves
to balance processing load and ensure that all input topic partitions are processed.
Internally a KafkaStreams
instance contains a normal KafkaProducer
and KafkaConsumer
instance
that is used for reading input and writing output.
A simple example might look like this:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
StreamsBuilder
,
Topology
Modifier and Type | Class and Description |
---|---|
static class |
KafkaStreams.State
Kafka Streams states are the possible state that a Kafka Streams instance can be in.
|
static interface |
KafkaStreams.StateListener
Listen to
KafkaStreams.State change events. |
Modifier and Type | Field and Description |
---|---|
protected KafkaStreams.State |
state |
protected org.apache.kafka.streams.processor.internals.StreamThread[] |
threads |
Constructor and Description |
---|
KafkaStreams(Topology topology,
Properties props)
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
Properties props,
KafkaClientSupplier clientSupplier)
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
Properties props,
KafkaClientSupplier clientSupplier,
org.apache.kafka.common.utils.Time time)
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
Properties props,
org.apache.kafka.common.utils.Time time)
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
StreamsConfig config)
Deprecated.
use
KafkaStreams(Topology, Properties) instead |
KafkaStreams(Topology topology,
StreamsConfig config,
KafkaClientSupplier clientSupplier)
Deprecated.
|
KafkaStreams(Topology topology,
StreamsConfig config,
org.apache.kafka.common.utils.Time time)
Deprecated.
use
KafkaStreams(Topology, Properties, Time) instead |
Modifier and Type | Method and Description |
---|---|
Map<String,Map<Integer,LagInfo>> |
allLocalStorePartitionLags()
Returns
LagInfo , for all store partitions (active or standby) local to this Streams instance. |
Collection<StreamsMetadata> |
allMetadata()
Find all currently running
KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance. |
Collection<StreamsMetadata> |
allMetadataForStore(String storeName)
Find all currently running
KafkaStreams instances (potentially remotely) that
use the same application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)
and that contain a StateStore with the given storeName
and return StreamsMetadata for each discovered instance. |
void |
cleanUp()
Do a clean up of the local
StateStore directory (StreamsConfig.STATE_DIR_CONFIG ) by deleting all
data with regard to the application ID . |
void |
close()
Shutdown this
KafkaStreams instance by signaling all the threads to stop, and then wait for them to join. |
boolean |
close(Duration timeout)
Shutdown this
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join. |
boolean |
close(long timeout,
TimeUnit timeUnit)
Deprecated.
Use
close(Duration) instead; note, that close(Duration) has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`. |
Set<ThreadMetadata> |
localThreadsMetadata()
Returns runtime information about the local threads of this
KafkaStreams instance. |
<K> StreamsMetadata |
metadataForKey(String storeName,
K key,
Serializer<K> keySerializer)
Deprecated.
Since 2.5. Use
queryMetadataForKey(String, Object, Serializer) instead. |
<K> StreamsMetadata |
metadataForKey(String storeName,
K key,
StreamPartitioner<? super K,?> partitioner)
Deprecated.
Since 2.5. Use
queryMetadataForKey(String, Object, StreamPartitioner) instead. |
Map<MetricName,? extends Metric> |
metrics()
Get read-only handle on global metrics registry, including streams client's own metrics plus
its embedded producer, consumer and admin clients' metrics.
|
<K> KeyQueryMetadata |
queryMetadataForKey(String storeName,
K key,
Serializer<K> keySerializer)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
|
<K> KeyQueryMetadata |
queryMetadataForKey(String storeName,
K key,
StreamPartitioner<? super K,?> partitioner)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
|
void |
setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
Set the listener which is triggered whenever a
StateStore is being restored in order to resume
processing. |
void |
setStateListener(KafkaStreams.StateListener listener)
An app can set a single
KafkaStreams.StateListener so that the app is notified when state changes. |
void |
setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)
Set the handler invoked when a
internal thread abruptly
terminates due to an uncaught exception. |
void |
start()
Start the
KafkaStreams instance by starting all its threads. |
KafkaStreams.State |
state()
Return the current
KafkaStreams.State of this KafkaStreams instance. |
<T> T |
store(StoreQueryParameters<T> storeQueryParameters)
Get a facade wrapping the local
StateStore instances with the provided StoreQueryParameters . |
<T> T |
store(String storeName,
QueryableStoreType<T> queryableStoreType)
Deprecated.
since 2.5 release; use
store(StoreQueryParameters) instead |
protected final org.apache.kafka.streams.processor.internals.StreamThread[] threads
protected volatile KafkaStreams.State state
public KafkaStreams(Topology topology, Properties props)
KafkaStreams
instance.
Note: even if you never call start()
on a KafkaStreams
instance,
you still must close()
it to avoid resource leaks.
topology
- the topology specifying the computational logicprops
- properties for StreamsConfig
StreamsException
- if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier)
KafkaStreams
instance.
Note: even if you never call start()
on a KafkaStreams
instance,
you still must close()
it to avoid resource leaks.
topology
- the topology specifying the computational logicprops
- properties for StreamsConfig
clientSupplier
- the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams
instanceStreamsException
- if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time)
KafkaStreams
instance.
Note: even if you never call start()
on a KafkaStreams
instance,
you still must close()
it to avoid resource leaks.
topology
- the topology specifying the computational logicprops
- properties for StreamsConfig
time
- Time
implementation; cannot be nullStreamsException
- if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time)
KafkaStreams
instance.
Note: even if you never call start()
on a KafkaStreams
instance,
you still must close()
it to avoid resource leaks.
topology
- the topology specifying the computational logicprops
- properties for StreamsConfig
clientSupplier
- the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams
instancetime
- Time
implementation; cannot be nullStreamsException
- if any fatal error occurs@Deprecated public KafkaStreams(Topology topology, StreamsConfig config)
KafkaStreams(Topology, Properties)
instead@Deprecated public KafkaStreams(Topology topology, StreamsConfig config, KafkaClientSupplier clientSupplier)
KafkaStreams(Topology, Properties, KafkaClientSupplier)
instead@Deprecated public KafkaStreams(Topology topology, StreamsConfig config, org.apache.kafka.common.utils.Time time)
KafkaStreams(Topology, Properties, Time)
insteadpublic KafkaStreams.State state()
KafkaStreams.State
of this KafkaStreams
instance.public void setStateListener(KafkaStreams.StateListener listener)
KafkaStreams.StateListener
so that the app is notified when state changes.listener
- a new state listenerIllegalStateException
- if this KafkaStreams
instance is not in state CREATED
.public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)
internal thread
abruptly
terminates due to an uncaught exception.eh
- the uncaught exception handler for all internal threads; null
deletes the current handlerIllegalStateException
- if this KafkaStreams
instance is not in state CREATED
.public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
StateStore
is being restored in order to resume
processing.globalStateRestoreListener
- The listener triggered when StateStore
is being restored.IllegalStateException
- if this KafkaStreams
instance is not in state CREATED
.public Map<MetricName,? extends Metric> metrics()
public void start() throws IllegalStateException, StreamsException
KafkaStreams
instance by starting all its threads.
This function is expected to be called only once during the life cycle of the client.
Because threads are started in the background, this method does not block.
However, if you have global stores in your topology, this method blocks until all global stores are restored.
As a consequence, any fatal exception that happens during processing is by default only logged.
If you want to be notified about dying threads, you can
register an uncaught exception handler
before starting the KafkaStreams
instance.
Note, for brokers with version 0.9.x
or lower, the broker version cannot be checked.
There will be no error and the client will hang and retry to verify the broker version until it
times out
.
IllegalStateException
- if process was already startedStreamsException
- if the Kafka brokers have version 0.10.0.x or
if exactly-once
is enabled for pre 0.11.0.x brokerspublic void close()
KafkaStreams
instance by signaling all the threads to stop, and then wait for them to join.
This will block until all threads have stopped.close
in interface AutoCloseable
@Deprecated public boolean close(long timeout, TimeUnit timeUnit)
close(Duration)
instead; note, that close(Duration)
has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.KafkaStreams
by signaling all the threads to stop, and then wait up to the timeout for the
threads to join.
A timeout
of 0 means to wait forever.timeout
- how long to wait for the threads to shutdown. Can't be negative. If timeout=0
just checking the state and return immediately.timeUnit
- unit of time used for timeouttrue
if all threads were successfully stopped—false
if the timeout was reached
before all threads stopped
Note that this method must not be called in the onChange
callback of KafkaStreams.StateListener
.public boolean close(Duration timeout) throws IllegalArgumentException
KafkaStreams
by signaling all the threads to stop, and then wait up to the timeout for the
threads to join.
A timeout
of Duration.ZERO (or any other zero duration) makes the close operation asynchronous.
Negative-duration timeouts are rejected.timeout
- how long to wait for the threads to shutdowntrue
if all threads were successfully stopped—false
if the timeout was reached
before all threads stopped
Note that this method must not be called in the KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State)
callback of KafkaStreams.StateListener
.IllegalArgumentException
- if timeout
can't be represented as long milliseconds
public void cleanUp()
StateStore
directory (StreamsConfig.STATE_DIR_CONFIG
) by deleting all
data with regard to the application ID
.
May only be called either before this KafkaStreams
instance is started
or after the
instance is closed
.
Calling this method triggers a restore of local StateStore
s on the next application start
.
IllegalStateException
- if this KafkaStreams
instance is currently running
StreamsException
- if cleanup failedpublic Collection<StreamsMetadata> allMetadata()
KafkaStreams
instances (potentially remotely) that use the same
application ID
as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata
for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
StreamsMetadata
for each KafkaStreams
instances of this applicationpublic Collection<StreamsMetadata> allMetadataForStore(String storeName)
KafkaStreams
instances (potentially remotely) that
application ID
as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore
with the given storeName
StreamsMetadata
for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
storeName
- the storeName
to find metadata forStreamsMetadata
for each KafkaStreams
instances with the provide storeName
of
this application@Deprecated public <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer)
queryMetadataForKey(String, Object, Serializer)
instead.KafkaStreams
instance (potentially remotely) that
application ID
as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore
with the given storeName
StateStore
contains the given key
StreamsMetadata
for it.
This will use the default Kafka Streams partitioner to locate the partition.
If a custom partitioner
has been
configured
via StreamsConfig
or
KStream.repartition(Repartitioned)
, or if the original KTable
's input
topic
is partitioned differently, please use
metadataForKey(String, Object, StreamPartitioner)
.
Note:
StateStore
; this method provides a way of finding which host it
would exist onK
- key typestoreName
- the storeName
to find metadata forkey
- the key to find metadata forkeySerializer
- serializer for the keyStreamsMetadata
for the KafkaStreams
instance with the provided storeName
and
key
of this application or StreamsMetadata.NOT_AVAILABLE
if Kafka Streams is (re-)initializing,
or null
if no matching metadata could be found.@Deprecated public <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
queryMetadataForKey(String, Object, StreamPartitioner)
instead.KafkaStreams
instance (potentially remotely) that
application ID
as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore
with the given storeName
StateStore
contains the given key
StreamsMetadata
for it.
Note:
StateStore
; this method provides a way of finding which host it
would exist onK
- key typestoreName
- the storeName
to find metadata forkey
- the key to find metadata forpartitioner
- the partitioner to be use to locate the host for the keyStreamsMetadata
for the KafkaStreams
instance with the provided storeName
and
key
of this application or StreamsMetadata.NOT_AVAILABLE
if Kafka Streams is (re-)initializing,
or null
if no matching metadata could be found.public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer)
K
- key type
Returns KeyQueryMetadata
containing all metadata about hosting the given key for the given store,
or null
if no matching metadata could be found.storeName
- the storeName
to find metadata forkey
- the key to find metadata forkeySerializer
- serializer for the keypublic <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
K
- key type
Returns KeyQueryMetadata
containing all metadata about hosting the given key for the given store, using the
the supplied partitioner, or null
if no matching metadata could be found.storeName
- the storeName
to find metadata forkey
- the key to find metadata forpartitioner
- the partitioner to be use to locate the host for the key@Deprecated public <T> T store(String storeName, QueryableStoreType<T> queryableStoreType)
store(StoreQueryParameters)
insteadpublic <T> T store(StoreQueryParameters<T> storeQueryParameters)
StateStore
instances with the provided StoreQueryParameters
.
The returned object can be used to query the StateStore
instances.storeQueryParameters
- the parameters used to fetch a queryable storeStateStore
instancesInvalidStateStoreException
- If the specified store name does not exist in the topology
or if the Streams instance isn't in a queryable state.
If the store's type does not match the QueryableStoreType,
the Streams instance is not in a queryable state with respect
to the parameters, or if the store is not available locally, then
an InvalidStateStoreException is thrown upon store access.public Set<ThreadMetadata> localThreadsMetadata()
KafkaStreams
instance.ThreadMetadata
.public Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags()
LagInfo
, for all store partitions (active or standby) local to this Streams instance. Note that the
values returned are just estimates and meant to be used for making soft decisions on whether the data in the store
partition is fresh enough for querying.
Note: Each invocation of this method issues a call to the Kafka brokers. Thus its advisable to limit the frequency
of invocation to once every few seconds.LagInfo
sStreamsException
- if the admin client request throws exception