public class KafkaStreams extends Object implements AutoCloseable
The computational logic can be specified either by using the Topology to define a DAG topology of
Processors or by using the StreamsBuilder which provides the high-level DSL to define
transformations.
One KafkaStreams instance can contain one or more threads specified in the configs for the processing work.
A KafkaStreams instance can co-ordinate with any other instances with the same
application ID (whether in the same process, on other processes on this
machine, or on remote machines) as a single (possibly distributed) stream processing application.
These instances will divide up the work based on the assignment of the input topic partitions so that all partitions
are being consumed.
If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves
to balance processing load and ensure that all input topic partitions are processed.
Internally a KafkaStreams instance contains a normal KafkaProducer and KafkaConsumer instance
that is used for reading input and writing output.
A simple example might look like this:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
StreamsBuilder,
Topology| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaStreams.CloseOptions
Class that handles options passed in case of
KafkaStreams instance scale down |
static class |
KafkaStreams.State
Kafka Streams states are the possible state that a Kafka Streams instance can be in.
|
static interface |
KafkaStreams.StateListener
Listen to
KafkaStreams.State change events. |
| Modifier and Type | Field and Description |
|---|---|
protected Admin |
adminClient |
protected StreamsConfig |
applicationConfigs |
protected String |
clientId |
protected KafkaStreams.State |
state |
protected org.apache.kafka.streams.processor.internals.StateDirectory |
stateDirectory |
protected org.apache.kafka.streams.processor.internals.StreamsMetadataState |
streamsMetadataState |
protected List<org.apache.kafka.streams.processor.internals.StreamThread> |
threads |
protected org.apache.kafka.streams.processor.internals.TopologyMetadata |
topologyMetadata |
| Modifier | Constructor and Description |
|---|---|
protected |
KafkaStreams(org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata,
StreamsConfig applicationConfigs,
KafkaClientSupplier clientSupplier) |
|
KafkaStreams(Topology topology,
Properties props)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
Properties props,
KafkaClientSupplier clientSupplier)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
Properties props,
KafkaClientSupplier clientSupplier,
org.apache.kafka.common.utils.Time time)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
Properties props,
org.apache.kafka.common.utils.Time time)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
StreamsConfig applicationConfigs)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
StreamsConfig applicationConfigs,
KafkaClientSupplier clientSupplier)
Create a
KafkaStreams instance. |
|
KafkaStreams(Topology topology,
StreamsConfig applicationConfigs,
org.apache.kafka.common.utils.Time time)
Create a
KafkaStreams instance. |
| Modifier and Type | Method and Description |
|---|---|
Optional<String> |
addStreamThread()
Adds and starts a stream thread in addition to the stream threads that are already running in this
Kafka Streams client.
|
Map<String,Map<Integer,LagInfo>> |
allLocalStorePartitionLags()
Returns
LagInfo, for all store partitions (active or standby) local to this Streams instance. |
protected Map<String,Map<Integer,LagInfo>> |
allLocalStorePartitionLags(List<org.apache.kafka.streams.processor.internals.Task> tasksToCollectLagFor) |
Collection<StreamsMetadata> |
allMetadata()
Deprecated.
since 3.0.0 use
metadataForAllStreamsClients() |
Collection<StreamsMetadata> |
allMetadataForStore(String storeName)
Deprecated.
since 3.0.0 use
streamsMetadataForStore(java.lang.String) instead |
void |
cleanUp()
Do a clean up of the local
StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all
data with regard to the application ID. |
ClientInstanceIds |
clientInstanceIds(Duration timeout)
Returns the internal clients' assigned
client instance ids. |
void |
close()
Shutdown this
KafkaStreams instance by signaling all the threads to stop, and then wait for them to join. |
boolean |
close(Duration timeout)
Shutdown this
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join. |
boolean |
close(KafkaStreams.CloseOptions options)
Shutdown this
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join. |
protected boolean |
hasStartedOrFinishedShuttingDown() |
boolean |
isPaused() |
protected boolean |
isRunningOrRebalancing() |
Set<ThreadMetadata> |
localThreadsMetadata()
Deprecated.
since 3.0 use
metadataForLocalThreads() |
Collection<StreamsMetadata> |
metadataForAllStreamsClients()
Find all currently running
KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance. |
Set<ThreadMetadata> |
metadataForLocalThreads()
Returns runtime information about the local threads of this
KafkaStreams instance. |
Map<MetricName,? extends Metric> |
metrics()
Get read-only handle on global metrics registry, including streams client's own metrics plus
its embedded producer, consumer and admin clients' metrics.
|
void |
pause()
This method pauses processing for the KafkaStreams instance.
|
protected int |
processStreamThread(Consumer<org.apache.kafka.streams.processor.internals.StreamThread> consumer)
handle each stream thread in a snapshot of threads.
|
<R> StateQueryResult<R> |
query(StateQueryRequest<R> request)
Run an interactive query against a state store.
|
<K> KeyQueryMetadata |
queryMetadataForKey(String storeName,
K key,
Serializer<K> keySerializer)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
|
<K> KeyQueryMetadata |
queryMetadataForKey(String storeName,
K key,
StreamPartitioner<? super K,?> partitioner)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
|
Optional<String> |
removeStreamThread()
Removes one stream thread out of the running stream threads from this Kafka Streams client.
|
Optional<String> |
removeStreamThread(Duration timeout)
Removes one stream thread out of the running stream threads from this Kafka Streams client.
|
void |
resume()
This method resumes processing for the KafkaStreams instance.
|
void |
setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
Set the listener which is triggered whenever a
StateStore is being restored in order to resume
processing. |
void |
setStandbyUpdateListener(StandbyUpdateListener standbyListener)
Set the listener which is triggered whenever a standby task is updated
|
void |
setStateListener(KafkaStreams.StateListener listener)
An app can set a single
KafkaStreams.StateListener so that the app is notified when state changes. |
void |
setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler)
Set the handler invoked when an internal
stream thread
throws an unexpected exception. |
void |
setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
Deprecated.
Since 2.8.0. Use
setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) instead. |
void |
start()
Start the
KafkaStreams instance by starting all its threads. |
KafkaStreams.State |
state()
Return the current
KafkaStreams.State of this KafkaStreams instance. |
<T> T |
store(StoreQueryParameters<T> storeQueryParameters)
Get a facade wrapping the local
StateStore instances with the provided StoreQueryParameters. |
Collection<StreamsMetadata> |
streamsMetadataForStore(String storeName)
Find all currently running
KafkaStreams instances (potentially remotely) that
use the same application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)
and that contain a StateStore with the given storeName
and return StreamsMetadata for each discovered instance. |
protected void |
validateIsRunningOrRebalancing() |
protected final String clientId
protected final StreamsConfig applicationConfigs
protected final List<org.apache.kafka.streams.processor.internals.StreamThread> threads
protected final org.apache.kafka.streams.processor.internals.StateDirectory stateDirectory
protected final org.apache.kafka.streams.processor.internals.StreamsMetadataState streamsMetadataState
protected final Admin adminClient
protected final org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata
protected volatile KafkaStreams.State state
public KafkaStreams(Topology topology, Properties props)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicprops - properties for StreamsConfigStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicprops - properties for StreamsConfigclientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams instanceStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicprops - properties for StreamsConfigtime - Time implementation; cannot be nullStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicprops - properties for StreamsConfigclientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams instancetime - Time implementation; cannot be nullStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig applicationConfigs)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicapplicationConfigs - configs for Kafka StreamsStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicapplicationConfigs - configs for Kafka StreamsclientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams instanceStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig applicationConfigs, org.apache.kafka.common.utils.Time time)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicapplicationConfigs - configs for Kafka Streamstime - Time implementation; cannot be nullStreamsException - if any fatal error occursprotected KafkaStreams(org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata,
StreamsConfig applicationConfigs,
KafkaClientSupplier clientSupplier)
throws StreamsException
StreamsExceptionpublic KafkaStreams.State state()
KafkaStreams.State of this KafkaStreams instance.protected boolean isRunningOrRebalancing()
protected boolean hasStartedOrFinishedShuttingDown()
protected void validateIsRunningOrRebalancing()
public void setStateListener(KafkaStreams.StateListener listener)
KafkaStreams.StateListener so that the app is notified when state changes.listener - a new state listenerIllegalStateException - if this KafkaStreams instance has already been started.@Deprecated public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) instead.stream thread abruptly
terminates due to an uncaught exception.uncaughtExceptionHandler - the uncaught exception handler for all internal threads; null deletes the current handlerIllegalStateException - if this KafkaStreams instance has already been started.public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler)
stream thread
throws an unexpected exception.
These might be exceptions indicating rare bugs in Kafka Streams, or they
might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic.
The handler will execute on the thread that produced the exception.
In order to get the thread that threw the exception, use Thread.currentThread().
Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception.
userStreamsUncaughtExceptionHandler - the uncaught exception handler of type StreamsUncaughtExceptionHandler for all internal threadsIllegalStateException - if this KafkaStreams instance has already been started.NullPointerException - if userStreamsUncaughtExceptionHandler is null.public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
StateStore is being restored in order to resume
processing.globalStateRestoreListener - The listener triggered when StateStore is being restored.IllegalStateException - if this KafkaStreams instance has already been started.public void setStandbyUpdateListener(StandbyUpdateListener standbyListener)
standbyListener - The listener triggered when a standby task is updated.IllegalStateException - if this KafkaStreams instance has already been started.public Map<MetricName,? extends Metric> metrics()
public Optional<String> addStreamThread()
Since the number of stream threads increases, the sizes of the caches in the new stream thread
and the existing stream threads are adapted so that the sum of the cache sizes over all stream
threads does not exceed the total cache size specified in configuration
StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.
Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
public Optional<String> removeStreamThread()
The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.
Since the number of stream threads decreases, the sizes of the caches in the remaining stream
threads are adapted so that the sum of the cache sizes over all stream threads equals the total
cache size specified in configuration StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.
public Optional<String> removeStreamThread(Duration timeout)
The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.
Since the number of stream threads decreases, the sizes of the caches in the remaining stream
threads are adapted so that the sum of the cache sizes over all stream threads equals the total
cache size specified in configuration StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.
timeout - The length of time to wait for the thread to shut downTimeoutException - if the thread does not stop in timepublic void start()
throws IllegalStateException,
StreamsException
KafkaStreams instance by starting all its threads.
This function is expected to be called only once during the life cycle of the client.
Because threads are started in the background, this method does not block.
However, if you have global stores in your topology, this method blocks until all global stores are restored.
As a consequence, any fatal exception that happens during processing is by default only logged.
If you want to be notified about dying threads, you can
register an uncaught exception handler
before starting the KafkaStreams instance.
Note, for brokers with version 0.9.x or lower, the broker version cannot be checked.
There will be no error and the client will hang and retry to verify the broker version until it
times out.
IllegalStateException - if process was already startedStreamsException - if the Kafka brokers have version 0.10.0.x or
if exactly-once is enabled for pre 0.11.0.x brokerspublic void close()
KafkaStreams instance by signaling all the threads to stop, and then wait for them to join.
This will block until all threads have stopped.close in interface AutoCloseablepublic boolean close(Duration timeout) throws IllegalArgumentException
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join.
A timeout of Duration.ZERO (or any other zero duration) makes the close operation asynchronous.
Negative-duration timeouts are rejected.timeout - how long to wait for the threads to shutdowntrue if all threads were successfully stopped—false if the timeout was reached
before all threads stopped
Note that this method must not be called in the KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State) callback of KafkaStreams.StateListener.IllegalArgumentException - if timeout can't be represented as long millisecondspublic boolean close(KafkaStreams.CloseOptions options) throws IllegalArgumentException
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join.options - contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to
trigger consumer leave calltrue if all threads were successfully stopped—false if the timeout was reached
before all threads stopped
Note that this method must not be called in the KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State) callback of KafkaStreams.StateListener.IllegalArgumentException - if timeout can't be represented as long millisecondspublic void cleanUp()
StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all
data with regard to the application ID.
May only be called either before this KafkaStreams instance is started or after the
instance is closed.
Calling this method triggers a restore of local StateStores on the next application start.
IllegalStateException - if this KafkaStreams instance has been started and hasn't fully shut downStreamsException - if cleanup failed@Deprecated public Collection<StreamsMetadata> allMetadata()
metadataForAllStreamsClients()KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
StreamsMetadata for each KafkaStreams instances of this applicationpublic Collection<StreamsMetadata> metadataForAllStreamsClients()
KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
StreamsMetadata for each KafkaStreams instances of this application@Deprecated public Collection<StreamsMetadata> allMetadataForStore(String storeName)
streamsMetadataForStore(java.lang.String) insteadKafkaStreams instances (potentially remotely) that
application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore with the given storeNameStreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
storeName - the storeName to find metadata forStreamsMetadata for each KafkaStreams instances with the provide storeName of
this applicationpublic Collection<StreamsMetadata> streamsMetadataForStore(String storeName)
KafkaStreams instances (potentially remotely) that
application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore with the given storeNameStreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
storeName - the storeName to find metadata forStreamsMetadata for each KafkaStreams instances with the provide storeName of
this applicationpublic <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer)
K - key type
Returns KeyQueryMetadata containing all metadata about hosting the given key for the given store,
or null if no matching metadata could be found.storeName - the storeName to find metadata forkey - the key to find metadata forkeySerializer - serializer for the keypublic <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
K - key type
Returns KeyQueryMetadata containing all metadata about hosting the given key for the given store, using the
the supplied partitioner, or null if no matching metadata could be found.storeName - the storeName to find metadata forkey - the key to find metadata forpartitioner - the partitioner to be use to locate the host for the keypublic <T> T store(StoreQueryParameters<T> storeQueryParameters)
StateStore instances with the provided StoreQueryParameters.
The returned object can be used to query the StateStore instances.storeQueryParameters - the parameters used to fetch a queryable storeStateStore instancesStreamsNotStartedException - If Streams has not yet been started. Just call start()
and then retry this call.UnknownStateStoreException - If the specified store name does not exist in the topology.InvalidStateStorePartitionException - If the specified partition does not exist.InvalidStateStoreException - If the Streams instance isn't in a queryable state.
If the store's type does not match the QueryableStoreType,
the Streams instance is not in a queryable state with respect
to the parameters, or if the store is not available locally, then
an InvalidStateStoreException is thrown upon store access.public void pause()
Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. Notably, paused topologies will still poll Kafka consumers, and commit offsets. This method sets transient state that is not maintained or managed among instances. Note that pause() can be called before start() in order to start a KafkaStreams instance in a manner where the processing is paused as described, but the consumers are started up.
public boolean isPaused()
public void resume()
protected int processStreamThread(Consumer<org.apache.kafka.streams.processor.internals.StreamThread> consumer)
consumer - handlerpublic ClientInstanceIds clientInstanceIds(Duration timeout)
client instance ids.
Note, if StreamsConfig.PROCESSING_GUARANTEE_CONFIG is set to StreamsConfig.EXACTLY_ONCE,
the producer client instance ids are not returned yet. This gap will be closed in the next release.
IllegalArgumentException - If timeout is negative.IllegalStateException - If KafkaStreams is not running.TimeoutException - Indicates that a request timed out.StreamsException - For any other error that might occur.@Deprecated public Set<ThreadMetadata> localThreadsMetadata()
metadataForLocalThreads()KafkaStreams instance.ThreadMetadata.public Set<ThreadMetadata> metadataForLocalThreads()
KafkaStreams instance.ThreadMetadata.public Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags()
LagInfo, for all store partitions (active or standby) local to this Streams instance. Note that the
values returned are just estimates and meant to be used for making soft decisions on whether the data in the store
partition is fresh enough for querying.
Note: Each invocation of this method issues a call to the Kafka brokers. Thus, it's advisable to limit the frequency of invocation to once every few seconds.
LagInfosStreamsException - if the admin client request throws exceptionprotected Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags(List<org.apache.kafka.streams.processor.internals.Task> tasksToCollectLagFor)
@InterfaceStability.Evolving public <R> StateQueryResult<R> query(StateQueryRequest<R> request)
This method allows callers outside of the Streams runtime to access the internal state of stateful processors. See IQ docs for more information.
NOTICE: This functionality is InterfaceStability.Evolving and subject to change in minor versions.
Once it is stabilized, this notice and the evolving annotation will be removed.
R - The result type specified by the query.StreamsNotStartedException - If Streams has not yet been started. Just call start() and then retry this call.StreamsStoppedException - If Streams is in a terminal state like PENDING_SHUTDOWN,
NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should
discover a new instance to query.UnknownStateStoreException - If the specified store name does not exist in the
topology.