Class KafkaStreams
- java.lang.Object
-
- org.apache.kafka.streams.KafkaStreams
-
- All Implemented Interfaces:
AutoCloseable
public class KafkaStreams extends Object implements AutoCloseable
A Kafka client that allows for performing continuous computation on input coming from one or more input topics and sends output to zero, one, or more output topics.The computational logic can be specified either by using the
Topology
to define a DAG topology ofProcessor
s or by using theStreamsBuilder
which provides the high-level DSL to define transformations.One
KafkaStreams
instance can contain one or more threads specified in the configs for the processing work.A
KafkaStreams
instance can co-ordinate with any other instances with the sameapplication ID
(whether in the same process, on other processes on this machine, or on remote machines) as a single (possibly distributed) stream processing application. These instances will divide up the work based on the assignment of the input topic partitions so that all partitions are being consumed. If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves to balance processing load and ensure that all input topic partitions are processed.Internally a
KafkaStreams
instance contains a normalKafkaProducer
andKafkaConsumer
instance that is used for reading input and writing output.A simple example might look like this:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
- See Also:
StreamsBuilder
,Topology
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaStreams.State
Kafka Streams states are the possible state that a Kafka Streams instance can be in.static interface
KafkaStreams.StateListener
Listen toKafkaStreams.State
change events.
-
Field Summary
Fields Modifier and Type Field Description protected KafkaStreams.State
state
protected List<org.apache.kafka.streams.processor.internals.StreamThread>
threads
-
Constructor Summary
Constructors Constructor Description KafkaStreams(Topology topology, Properties props)
Create aKafkaStreams
instance.KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time)
Create aKafkaStreams
instance.KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier)
Create aKafkaStreams
instance.KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time)
Create aKafkaStreams
instance.KafkaStreams(Topology topology, StreamsConfig config)
Deprecated.useKafkaStreams(Topology, Properties)
insteadKafkaStreams(Topology topology, StreamsConfig config, org.apache.kafka.common.utils.Time time)
Deprecated.useKafkaStreams(Topology, Properties, Time)
insteadKafkaStreams(Topology topology, StreamsConfig config, KafkaClientSupplier clientSupplier)
Deprecated.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description Optional<String>
addStreamThread()
Adds and starts a stream thread in addition to the stream threads that are already running in this Kafka Streams client.Map<String,Map<Integer,LagInfo>>
allLocalStorePartitionLags()
ReturnsLagInfo
, for all store partitions (active or standby) local to this Streams instance.Collection<StreamsMetadata>
allMetadata()
Find all currently runningKafkaStreams
instances (potentially remotely) that use the sameapplication ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) and returnStreamsMetadata
for each discovered instance.Collection<StreamsMetadata>
allMetadataForStore(String storeName)
Find all currently runningKafkaStreams
instances (potentially remotely) that use the sameapplication ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) and that contain aStateStore
with the givenstoreName
and returnStreamsMetadata
for each discovered instance.void
cleanUp()
Do a clean up of the localStateStore
directory (StreamsConfig.STATE_DIR_CONFIG
) by deleting all data with regard to theapplication ID
.void
close()
Shutdown thisKafkaStreams
instance by signaling all the threads to stop, and then wait for them to join.boolean
close(long timeout, TimeUnit timeUnit)
Deprecated.Useclose(Duration)
instead; note, thatclose(Duration)
has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.boolean
close(Duration timeout)
Shutdown thisKafkaStreams
by signaling all the threads to stop, and then wait up to the timeout for the threads to join.Set<ThreadMetadata>
localThreadsMetadata()
Returns runtime information about the local threads of thisKafkaStreams
instance.<K> StreamsMetadata
metadataForKey(String storeName, K key, Serializer<K> keySerializer)
Deprecated.Since 2.5.<K> StreamsMetadata
metadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
Deprecated.Since 2.5.Map<MetricName,? extends Metric>
metrics()
Get read-only handle on global metrics registry, including streams client's own metrics plus its embedded producer, consumer and admin clients' metrics.<K> KeyQueryMetadata
queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.<K> KeyQueryMetadata
queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.Optional<String>
removeStreamThread()
Removes one stream thread out of the running stream threads from this Kafka Streams client.Optional<String>
removeStreamThread(Duration timeout)
Removes one stream thread out of the running stream threads from this Kafka Streams client.void
setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
Set the listener which is triggered whenever aStateStore
is being restored in order to resume processing.void
setStateListener(KafkaStreams.StateListener listener)
An app can set a singleKafkaStreams.StateListener
so that the app is notified when state changes.void
setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
Deprecated.Since 2.8.0.void
setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
Set the handler invoked when an internalstream thread
throws an unexpected exception.void
start()
Start theKafkaStreams
instance by starting all its threads.KafkaStreams.State
state()
Return the currentKafkaStreams.State
of thisKafkaStreams
instance.<T> T
store(String storeName, QueryableStoreType<T> queryableStoreType)
Deprecated.since 2.5 release; usestore(StoreQueryParameters)
instead<T> T
store(StoreQueryParameters<T> storeQueryParameters)
Get a facade wrapping the localStateStore
instances with the providedStoreQueryParameters
.
-
-
-
Field Detail
-
threads
protected final List<org.apache.kafka.streams.processor.internals.StreamThread> threads
-
state
protected volatile KafkaStreams.State state
-
-
Constructor Detail
-
KafkaStreams
public KafkaStreams(Topology topology, Properties props)
Create aKafkaStreams
instance.Note: even if you never call
start()
on aKafkaStreams
instance, you still mustclose()
it to avoid resource leaks.- Parameters:
topology
- the topology specifying the computational logicprops
- properties forStreamsConfig
- Throws:
StreamsException
- if any fatal error occurs
-
KafkaStreams
public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier)
Create aKafkaStreams
instance.Note: even if you never call
start()
on aKafkaStreams
instance, you still mustclose()
it to avoid resource leaks.- Parameters:
topology
- the topology specifying the computational logicprops
- properties forStreamsConfig
clientSupplier
- the Kafka clients supplier which provides underlying producer and consumer clients for the newKafkaStreams
instance- Throws:
StreamsException
- if any fatal error occurs
-
KafkaStreams
public KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time)
Create aKafkaStreams
instance.Note: even if you never call
start()
on aKafkaStreams
instance, you still mustclose()
it to avoid resource leaks.- Parameters:
topology
- the topology specifying the computational logicprops
- properties forStreamsConfig
time
-Time
implementation; cannot be null- Throws:
StreamsException
- if any fatal error occurs
-
KafkaStreams
public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time)
Create aKafkaStreams
instance.Note: even if you never call
start()
on aKafkaStreams
instance, you still mustclose()
it to avoid resource leaks.- Parameters:
topology
- the topology specifying the computational logicprops
- properties forStreamsConfig
clientSupplier
- the Kafka clients supplier which provides underlying producer and consumer clients for the newKafkaStreams
instancetime
-Time
implementation; cannot be null- Throws:
StreamsException
- if any fatal error occurs
-
KafkaStreams
@Deprecated public KafkaStreams(Topology topology, StreamsConfig config)
Deprecated.useKafkaStreams(Topology, Properties)
instead
-
KafkaStreams
@Deprecated public KafkaStreams(Topology topology, StreamsConfig config, KafkaClientSupplier clientSupplier)
Deprecated.
-
KafkaStreams
@Deprecated public KafkaStreams(Topology topology, StreamsConfig config, org.apache.kafka.common.utils.Time time)
Deprecated.useKafkaStreams(Topology, Properties, Time)
instead
-
-
Method Detail
-
state
public KafkaStreams.State state()
Return the currentKafkaStreams.State
of thisKafkaStreams
instance.- Returns:
- the current state of this Kafka Streams instance
-
setStateListener
public void setStateListener(KafkaStreams.StateListener listener)
An app can set a singleKafkaStreams.StateListener
so that the app is notified when state changes.- Parameters:
listener
- a new state listener- Throws:
IllegalStateException
- if thisKafkaStreams
instance is not in stateCREATED
.
-
setUncaughtExceptionHandler
@Deprecated public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
Deprecated.Since 2.8.0. UsesetUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
instead.Set the handler invoked when an internalstream thread
abruptly terminates due to an uncaught exception.- Parameters:
uncaughtExceptionHandler
- the uncaught exception handler for all internal threads;null
deletes the current handler- Throws:
IllegalStateException
- if thisKafkaStreams
instance is not in stateCREATED
.
-
setUncaughtExceptionHandler
public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
Set the handler invoked when an internalstream thread
throws an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. The handler will execute on the thread that produced the exception. In order to get the thread that threw the exception, useThread.currentThread()
.Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception.
- Parameters:
streamsUncaughtExceptionHandler
- the uncaught exception handler of typeStreamsUncaughtExceptionHandler
for all internal threads- Throws:
IllegalStateException
- if thisKafkaStreams
instance is not in stateCREATED
.NullPointerException
- if streamsUncaughtExceptionHandler is null.
-
setGlobalStateRestoreListener
public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
Set the listener which is triggered whenever aStateStore
is being restored in order to resume processing.- Parameters:
globalStateRestoreListener
- The listener triggered whenStateStore
is being restored.- Throws:
IllegalStateException
- if thisKafkaStreams
instance is not in stateCREATED
.
-
metrics
public Map<MetricName,? extends Metric> metrics()
Get read-only handle on global metrics registry, including streams client's own metrics plus its embedded producer, consumer and admin clients' metrics.- Returns:
- Map of all metrics.
-
addStreamThread
public Optional<String> addStreamThread()
Adds and starts a stream thread in addition to the stream threads that are already running in this Kafka Streams client.Since the number of stream threads increases, the sizes of the caches in the new stream thread and the existing stream threads are adapted so that the sum of the cache sizes over all stream threads does not exceed the total cache size specified in configuration
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
.Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
- Returns:
- name of the added stream thread or empty if a new stream thread could not be added
-
removeStreamThread
public Optional<String> removeStreamThread()
Removes one stream thread out of the running stream threads from this Kafka Streams client.The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.
Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
.- Returns:
- name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
-
removeStreamThread
public Optional<String> removeStreamThread(Duration timeout)
Removes one stream thread out of the running stream threads from this Kafka Streams client.The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.
Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
.- Parameters:
timeout
- The the length of time to wait for the thread to shutdown- Returns:
- name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
- Throws:
TimeoutException
- if the thread does not stop in time
-
start
public void start() throws IllegalStateException, StreamsException
Start theKafkaStreams
instance by starting all its threads. This function is expected to be called only once during the life cycle of the client.Because threads are started in the background, this method does not block. However, if you have global stores in your topology, this method blocks until all global stores are restored. As a consequence, any fatal exception that happens during processing is by default only logged. If you want to be notified about dying threads, you can
register an uncaught exception handler
before starting theKafkaStreams
instance.Note, for brokers with version
0.9.x
or lower, the broker version cannot be checked. There will be no error and the client will hang and retry to verify the broker version until ittimes out
.- Throws:
IllegalStateException
- if process was already startedStreamsException
- if the Kafka brokers have version 0.10.0.x or ifexactly-once
is enabled for pre 0.11.0.x brokers
-
close
public void close()
Shutdown thisKafkaStreams
instance by signaling all the threads to stop, and then wait for them to join. This will block until all threads have stopped.- Specified by:
close
in interfaceAutoCloseable
-
close
@Deprecated public boolean close(long timeout, TimeUnit timeUnit)
Deprecated.Useclose(Duration)
instead; note, thatclose(Duration)
has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.Shutdown thisKafkaStreams
by signaling all the threads to stop, and then wait up to the timeout for the threads to join. Atimeout
of 0 means to wait forever.- Parameters:
timeout
- how long to wait for the threads to shutdown. Can't be negative. Iftimeout=0
just checking the state and return immediately.timeUnit
- unit of time used for timeout- Returns:
true
if all threads were successfully stopped—false
if the timeout was reached before all threads stopped Note that this method must not be called in theonChange
callback ofKafkaStreams.StateListener
.
-
close
public boolean close(Duration timeout) throws IllegalArgumentException
Shutdown thisKafkaStreams
by signaling all the threads to stop, and then wait up to the timeout for the threads to join. Atimeout
of Duration.ZERO (or any other zero duration) makes the close operation asynchronous. Negative-duration timeouts are rejected.- Parameters:
timeout
- how long to wait for the threads to shutdown- Returns:
true
if all threads were successfully stopped—false
if the timeout was reached before all threads stopped Note that this method must not be called in theKafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State)
callback ofKafkaStreams.StateListener
.- Throws:
IllegalArgumentException
- iftimeout
can't be represented aslong milliseconds
-
cleanUp
public void cleanUp()
Do a clean up of the localStateStore
directory (StreamsConfig.STATE_DIR_CONFIG
) by deleting all data with regard to theapplication ID
.May only be called either before this
KafkaStreams
instance isstarted
or after the instance isclosed
.Calling this method triggers a restore of local
StateStore
s on the nextapplication start
.- Throws:
IllegalStateException
- if thisKafkaStreams
instance is currentlyrunning
StreamsException
- if cleanup failed
-
allMetadata
public Collection<StreamsMetadata> allMetadata()
Find all currently runningKafkaStreams
instances (potentially remotely) that use the sameapplication ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) and returnStreamsMetadata
for each discovered instance.Note: this is a point in time view and it may change due to partition reassignment.
- Returns:
StreamsMetadata
for eachKafkaStreams
instances of this application
-
allMetadataForStore
public Collection<StreamsMetadata> allMetadataForStore(String storeName)
Find all currently runningKafkaStreams
instances (potentially remotely) that- use the same
application ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) - and that contain a
StateStore
with the givenstoreName
StreamsMetadata
for each discovered instance.Note: this is a point in time view and it may change due to partition reassignment.
- Parameters:
storeName
- thestoreName
to find metadata for- Returns:
StreamsMetadata
for eachKafkaStreams
instances with the providestoreName
of this application
- use the same
-
metadataForKey
@Deprecated public <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer)
Deprecated.Since 2.5. UsequeryMetadataForKey(String, Object, Serializer)
instead.Find the currently runningKafkaStreams
instance (potentially remotely) that- use the same
application ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) - and that contain a
StateStore
with the givenstoreName
- and the
StateStore
contains the givenkey
StreamsMetadata
for it.This will use the default Kafka Streams partitioner to locate the partition. If a
custom partitioner
has beenconfigured
viaStreamsConfig
orKStream.repartition(Repartitioned)
, or if the originalKTable
's inputtopic
is partitioned differently, please usemetadataForKey(String, Object, StreamPartitioner)
.Note:
- this is a point in time view and it may change due to partition reassignment
- the key may not exist in the
StateStore
; this method provides a way of finding which host it would exist on - if this is for a window store the serializer should be the serializer for the record key, not the window serializer
- Type Parameters:
K
- key type- Parameters:
storeName
- thestoreName
to find metadata forkey
- the key to find metadata forkeySerializer
- serializer for the key- Returns:
StreamsMetadata
for theKafkaStreams
instance with the providedstoreName
andkey
of this application orStreamsMetadata.NOT_AVAILABLE
if Kafka Streams is (re-)initializing, ornull
if no matching metadata could be found.
- use the same
-
metadataForKey
@Deprecated public <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
Deprecated.Since 2.5. UsequeryMetadataForKey(String, Object, StreamPartitioner)
instead.Find the currently runningKafkaStreams
instance (potentially remotely) that- use the same
application ID
as this instance (i.e., all instances that belong to the same Kafka Streams application) - and that contain a
StateStore
with the givenstoreName
- and the
StateStore
contains the givenkey
StreamsMetadata
for it.Note:
- this is a point in time view and it may change due to partition reassignment
- the key may not exist in the
StateStore
; this method provides a way of finding which host it would exist on
- Type Parameters:
K
- key type- Parameters:
storeName
- thestoreName
to find metadata forkey
- the key to find metadata forpartitioner
- the partitioner to be use to locate the host for the key- Returns:
StreamsMetadata
for theKafkaStreams
instance with the providedstoreName
andkey
of this application orStreamsMetadata.NOT_AVAILABLE
if Kafka Streams is (re-)initializing, ornull
if no matching metadata could be found.
- use the same
-
queryMetadataForKey
public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.- Type Parameters:
K
- key type ReturnsKeyQueryMetadata
containing all metadata about hosting the given key for the given store, ornull
if no matching metadata could be found.- Parameters:
storeName
- thestoreName
to find metadata forkey
- the key to find metadata forkeySerializer
- serializer for the key
-
queryMetadataForKey
public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.- Type Parameters:
K
- key type ReturnsKeyQueryMetadata
containing all metadata about hosting the given key for the given store, using the the supplied partitioner, ornull
if no matching metadata could be found.- Parameters:
storeName
- thestoreName
to find metadata forkey
- the key to find metadata forpartitioner
- the partitioner to be use to locate the host for the key
-
store
@Deprecated public <T> T store(String storeName, QueryableStoreType<T> queryableStoreType)
Deprecated.since 2.5 release; usestore(StoreQueryParameters)
instead
-
store
public <T> T store(StoreQueryParameters<T> storeQueryParameters)
Get a facade wrapping the localStateStore
instances with the providedStoreQueryParameters
. The returned object can be used to query theStateStore
instances.- Parameters:
storeQueryParameters
- the parameters used to fetch a queryable store- Returns:
- A facade wrapping the local
StateStore
instances - Throws:
InvalidStateStoreException
- If the specified store name does not exist in the topology or if the Streams instance isn't in a queryable state. If the store's type does not match the QueryableStoreType, the Streams instance is not in a queryable state with respect to the parameters, or if the store is not available locally, then an InvalidStateStoreException is thrown upon store access.
-
localThreadsMetadata
public Set<ThreadMetadata> localThreadsMetadata()
Returns runtime information about the local threads of thisKafkaStreams
instance.- Returns:
- the set of
ThreadMetadata
.
-
allLocalStorePartitionLags
public Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags()
ReturnsLagInfo
, for all store partitions (active or standby) local to this Streams instance. Note that the values returned are just estimates and meant to be used for making soft decisions on whether the data in the store partition is fresh enough for querying. Note: Each invocation of this method issues a call to the Kafka brokers. Thus its advisable to limit the frequency of invocation to once every few seconds.- Returns:
- map of store names to another map of partition to
LagInfo
s - Throws:
StreamsException
- if the admin client request throws exception
-
-