Class KafkaStreams
- All Implemented Interfaces:
- AutoCloseable
 The computational logic can be specified either by using the Topology to define a DAG topology of
 Processors or by using the StreamsBuilder which provides the high-level DSL to define
 transformations.
 
 One KafkaStreams instance can contain one or more threads specified in the configs for the processing work.
 
 A KafkaStreams instance can co-ordinate with any other instances with the same
 application ID (whether in the same process, on other processes on this
 machine, or on remote machines) as a single (possibly distributed) stream processing application.
 These instances will divide up the work based on the assignment of the input topic partitions so that all partitions
 are being consumed.
 If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves
 to balance processing load and ensure that all input topic partitions are processed.
 
 Internally a KafkaStreams instance contains a normal KafkaProducer and KafkaConsumer instance
 that is used for reading input and writing output.
 
A simple example might look like this:
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 StreamsBuilder builder = new StreamsBuilder();
 builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic");
 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();
 - See Also:
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic enumKafka Streams states are the possible state that a Kafka Streams instance can be in.static interfaceListen toKafkaStreams.Statechange events.
- 
Field SummaryFieldsModifier and TypeFieldDescriptionprotected final Adminprotected final StreamsConfigprotected final Stringprotected KafkaStreams.Stateprotected final org.apache.kafka.streams.processor.internals.StateDirectoryprotected final org.apache.kafka.streams.processor.internals.StreamsMetadataStateprotected final List<org.apache.kafka.streams.processor.internals.StreamThread>protected final org.apache.kafka.streams.processor.internals.TopologyMetadata
- 
Constructor SummaryConstructorsModifierConstructorDescriptionprotectedKafkaStreams(org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier) KafkaStreams(Topology topology, Properties props) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, StreamsConfig applicationConfigs) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, StreamsConfig applicationConfigs, org.apache.kafka.common.utils.Time time) Create aKafkaStreamsinstance.KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier) Create aKafkaStreamsinstance.
- 
Method SummaryModifier and TypeMethodDescriptionAdds and starts a stream thread in addition to the stream threads that are already running in this Kafka Streams client.ReturnsLagInfo, for all store partitions (active or standby) local to this Streams instance.allLocalStorePartitionLags(List<org.apache.kafka.streams.processor.internals.Task> tasksToCollectLagFor) Deprecated.allMetadataForStore(String storeName) Deprecated.since 3.0.0 usestreamsMetadataForStore(java.lang.String)insteadvoidcleanUp()Do a clean up of the localStateStoredirectory (StreamsConfig.STATE_DIR_CONFIG) by deleting all data with regard to theapplication ID.voidclose()Shutdown thisKafkaStreamsinstance by signaling all the threads to stop, and then wait for them to join.booleanShutdown thisKafkaStreamsby signaling all the threads to stop, and then wait up to the timeout for the threads to join.protected booleanprotected booleanDeprecated.since 3.0 usemetadataForLocalThreads()Find all currently runningKafkaStreamsinstances (potentially remotely) that use the sameapplication IDas this instance (i.e., all instances that belong to the same Kafka Streams application) and returnStreamsMetadatafor each discovered instance.Returns runtime information about the local threads of thisKafkaStreamsinstance.Map<MetricName,? extends Metric> metrics()Get read-only handle on global metrics registry, including streams client's own metrics plus its embedded producer, consumer and admin clients' metrics.protected intprocessStreamThread(Consumer<org.apache.kafka.streams.processor.internals.StreamThread> consumer) handle each stream thread in a snapshot of threads.<R> StateQueryResult<R>query(StateQueryRequest<R> request) Run an interactive query against a state store.<K> KeyQueryMetadataqueryMetadataForKey(String storeName, K key, Serializer<K> keySerializer) Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.<K> KeyQueryMetadataqueryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.Removes one stream thread out of the running stream threads from this Kafka Streams client.removeStreamThread(Duration timeout) Removes one stream thread out of the running stream threads from this Kafka Streams client.voidsetGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener) Set the listener which is triggered whenever aStateStoreis being restored in order to resume processing.voidsetStateListener(KafkaStreams.StateListener listener) An app can set a singleKafkaStreams.StateListenerso that the app is notified when state changes.voidsetUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) Deprecated.Since 2.8.0.voidsetUncaughtExceptionHandler(StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler) Set the handler invoked when an internalstream threadthrows an unexpected exception.voidstart()Start theKafkaStreamsinstance by starting all its threads.state()Return the currentKafkaStreams.Stateof thisKafkaStreamsinstance.<T> Tstore(StoreQueryParameters<T> storeQueryParameters) Get a facade wrapping the localStateStoreinstances with the providedStoreQueryParameters.streamsMetadataForStore(String storeName) Find all currently runningKafkaStreamsinstances (potentially remotely) that use the sameapplication IDas this instance (i.e., all instances that belong to the same Kafka Streams application) and that contain aStateStorewith the givenstoreNameand returnStreamsMetadatafor each discovered instance.protected void
- 
Field Details- 
clientId
- 
applicationConfigs
- 
threads
- 
stateDirectoryprotected final org.apache.kafka.streams.processor.internals.StateDirectory stateDirectory
- 
streamsMetadataStateprotected final org.apache.kafka.streams.processor.internals.StreamsMetadataState streamsMetadataState
- 
adminClient
- 
topologyMetadataprotected final org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata
- 
state
 
- 
- 
Constructor Details- 
KafkaStreamsCreate aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- props- properties for- StreamsConfig
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamsCreate aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- props- properties for- StreamsConfig
- clientSupplier- the Kafka clients supplier which provides underlying producer and consumer clients for the new- KafkaStreamsinstance
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamsCreate aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- props- properties for- StreamsConfig
- time-- Timeimplementation; cannot be null
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamspublic KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time) Create aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- props- properties for- StreamsConfig
- clientSupplier- the Kafka clients supplier which provides underlying producer and consumer clients for the new- KafkaStreamsinstance
- time-- Timeimplementation; cannot be null
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamsCreate aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- applicationConfigs- configs for Kafka Streams
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamspublic KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier) Create aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- applicationConfigs- configs for Kafka Streams
- clientSupplier- the Kafka clients supplier which provides underlying producer and consumer clients for the new- KafkaStreamsinstance
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamspublic KafkaStreams(Topology topology, StreamsConfig applicationConfigs, org.apache.kafka.common.utils.Time time) Create aKafkaStreamsinstance.Note: even if you never call start()on aKafkaStreamsinstance, you still mustclose()it to avoid resource leaks.- Parameters:
- topology- the topology specifying the computational logic
- applicationConfigs- configs for Kafka Streams
- time-- Timeimplementation; cannot be null
- Throws:
- StreamsException- if any fatal error occurs
 
- 
KafkaStreamsprotected KafkaStreams(org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier) throws StreamsException - Throws:
- StreamsException
 
 
- 
- 
Method Details- 
stateReturn the currentKafkaStreams.Stateof thisKafkaStreamsinstance.- Returns:
- the current state of this Kafka Streams instance
 
- 
isRunningOrRebalancingprotected boolean isRunningOrRebalancing()
- 
hasStartedOrFinishedShuttingDownprotected boolean hasStartedOrFinishedShuttingDown()
- 
validateIsRunningOrRebalancingprotected void validateIsRunningOrRebalancing()
- 
setStateListenerAn app can set a singleKafkaStreams.StateListenerso that the app is notified when state changes.- Parameters:
- listener- a new state listener
- Throws:
- IllegalStateException- if this- KafkaStreamsinstance has already been started.
 
- 
setUncaughtExceptionHandler@Deprecated public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) Deprecated.Since 2.8.0. UsesetUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)instead.Set the handler invoked when an internalstream threadabruptly terminates due to an uncaught exception.- Parameters:
- uncaughtExceptionHandler- the uncaught exception handler for all internal threads;- nulldeletes the current handler
- Throws:
- IllegalStateException- if this- KafkaStreamsinstance has already been started.
 
- 
setUncaughtExceptionHandlerpublic void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler) Set the handler invoked when an internalstream threadthrows an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. The handler will execute on the thread that produced the exception. In order to get the thread that threw the exception, useThread.currentThread().Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception. - Parameters:
- userStreamsUncaughtExceptionHandler- the uncaught exception handler of type- StreamsUncaughtExceptionHandlerfor all internal threads
- Throws:
- IllegalStateException- if this- KafkaStreamsinstance has already been started.
- NullPointerException- if userStreamsUncaughtExceptionHandler is null.
 
- 
setGlobalStateRestoreListenerSet the listener which is triggered whenever aStateStoreis being restored in order to resume processing.- Parameters:
- globalStateRestoreListener- The listener triggered when- StateStoreis being restored.
- Throws:
- IllegalStateException- if this- KafkaStreamsinstance has already been started.
 
- 
metricsGet read-only handle on global metrics registry, including streams client's own metrics plus its embedded producer, consumer and admin clients' metrics.- Returns:
- Map of all metrics.
 
- 
addStreamThreadAdds and starts a stream thread in addition to the stream threads that are already running in this Kafka Streams client.Since the number of stream threads increases, the sizes of the caches in the new stream thread and the existing stream threads are adapted so that the sum of the cache sizes over all stream threads does not exceed the total cache size specified in configuration StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG.Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. - Returns:
- name of the added stream thread or empty if a new stream thread could not be added
 
- 
removeStreamThreadRemoves one stream thread out of the running stream threads from this Kafka Streams client.The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down. Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG.- Returns:
- name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
 
- 
removeStreamThreadRemoves one stream thread out of the running stream threads from this Kafka Streams client.The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down. Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG.- Parameters:
- timeout- The length of time to wait for the thread to shutdown
- Returns:
- name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
- Throws:
- TimeoutException- if the thread does not stop in time
 
- 
startStart theKafkaStreamsinstance by starting all its threads. This function is expected to be called only once during the life cycle of the client.Because threads are started in the background, this method does not block. However, if you have global stores in your topology, this method blocks until all global stores are restored. As a consequence, any fatal exception that happens during processing is by default only logged. If you want to be notified about dying threads, you can register an uncaught exception handlerbefore starting theKafkaStreamsinstance.Note, for brokers with version 0.9.xor lower, the broker version cannot be checked. There will be no error and the client will hang and retry to verify the broker version until ittimes out.- Throws:
- IllegalStateException- if process was already started
- StreamsException- if the Kafka brokers have version 0.10.0.x or if- exactly-onceis enabled for pre 0.11.0.x brokers
 
- 
closepublic void close()Shutdown thisKafkaStreamsinstance by signaling all the threads to stop, and then wait for them to join. This will block until all threads have stopped.- Specified by:
- closein interface- AutoCloseable
 
- 
closeShutdown thisKafkaStreamsby signaling all the threads to stop, and then wait up to the timeout for the threads to join. Atimeoutof Duration.ZERO (or any other zero duration) makes the close operation asynchronous. Negative-duration timeouts are rejected.- Parameters:
- timeout- how long to wait for the threads to shutdown
- Returns:
- trueif all threads were successfully stopped—- falseif the timeout was reached before all threads stopped Note that this method must not be called in the- KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State)callback of- KafkaStreams.StateListener.
- Throws:
- IllegalArgumentException- if- timeoutcan't be represented as- long milliseconds
 
- 
cleanUppublic void cleanUp()Do a clean up of the localStateStoredirectory (StreamsConfig.STATE_DIR_CONFIG) by deleting all data with regard to theapplication ID.May only be called either before this KafkaStreamsinstance isstartedor after the instance isclosed.Calling this method triggers a restore of local StateStores on the nextapplication start.- Throws:
- IllegalStateException- if this- KafkaStreamsinstance has been started and hasn't fully shut down
- StreamsException- if cleanup failed
 
- 
allMetadataDeprecated.since 3.0.0 usemetadataForAllStreamsClients()Find all currently runningKafkaStreamsinstances (potentially remotely) that use the sameapplication IDas this instance (i.e., all instances that belong to the same Kafka Streams application) and returnStreamsMetadatafor each discovered instance.Note: this is a point in time view and it may change due to partition reassignment. - Returns:
- StreamsMetadatafor each- KafkaStreamsinstances of this application
 
- 
metadataForAllStreamsClientsFind all currently runningKafkaStreamsinstances (potentially remotely) that use the sameapplication IDas this instance (i.e., all instances that belong to the same Kafka Streams application) and returnStreamsMetadatafor each discovered instance.Note: this is a point in time view and it may change due to partition reassignment. - Returns:
- StreamsMetadatafor each- KafkaStreamsinstances of this application
 
- 
allMetadataForStoreDeprecated.since 3.0.0 usestreamsMetadataForStore(java.lang.String)insteadFind all currently runningKafkaStreamsinstances (potentially remotely) that- use the same application IDas this instance (i.e., all instances that belong to the same Kafka Streams application)
- and that contain a StateStorewith the givenstoreName
 StreamsMetadatafor each discovered instance.Note: this is a point in time view and it may change due to partition reassignment. - Parameters:
- storeName- the- storeNameto find metadata for
- Returns:
- StreamsMetadatafor each- KafkaStreamsinstances with the provide- storeNameof this application
 
- use the same 
- 
streamsMetadataForStoreFind all currently runningKafkaStreamsinstances (potentially remotely) that- use the same application IDas this instance (i.e., all instances that belong to the same Kafka Streams application)
- and that contain a StateStorewith the givenstoreName
 StreamsMetadatafor each discovered instance.Note: this is a point in time view and it may change due to partition reassignment. - Parameters:
- storeName- the- storeNameto find metadata for
- Returns:
- StreamsMetadatafor each- KafkaStreamsinstances with the provide- storeNameof this application
 
- use the same 
- 
queryMetadataForKeypublic <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer) Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.- Type Parameters:
- K- key type Returns- KeyQueryMetadatacontaining all metadata about hosting the given key for the given store, or- nullif no matching metadata could be found.
- Parameters:
- storeName- the- storeNameto find metadata for
- key- the key to find metadata for
- keySerializer- serializer for the key
 
- 
queryMetadataForKeypublic <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.- Type Parameters:
- K- key type Returns- KeyQueryMetadatacontaining all metadata about hosting the given key for the given store, using the the supplied partitioner, or- nullif no matching metadata could be found.
- Parameters:
- storeName- the- storeNameto find metadata for
- key- the key to find metadata for
- partitioner- the partitioner to be use to locate the host for the key
 
- 
storeGet a facade wrapping the localStateStoreinstances with the providedStoreQueryParameters. The returned object can be used to query theStateStoreinstances.- Parameters:
- storeQueryParameters- the parameters used to fetch a queryable store
- Returns:
- A facade wrapping the local StateStoreinstances
- Throws:
- StreamsNotStartedException- If Streams has not yet been started. Just call- start()and then retry this call.
- UnknownStateStoreException- If the specified store name does not exist in the topology.
- InvalidStateStorePartitionException- If the specified partition does not exist.
- InvalidStateStoreException- If the Streams instance isn't in a queryable state. If the store's type does not match the QueryableStoreType, the Streams instance is not in a queryable state with respect to the parameters, or if the store is not available locally, then an InvalidStateStoreException is thrown upon store access.
 
- 
processStreamThreadprotected int processStreamThread(Consumer<org.apache.kafka.streams.processor.internals.StreamThread> consumer) handle each stream thread in a snapshot of threads. noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding threads lock when looping threads.- Parameters:
- consumer- handler
 
- 
localThreadsMetadataDeprecated.since 3.0 usemetadataForLocalThreads()Returns runtime information about the local threads of thisKafkaStreamsinstance.- Returns:
- the set of ThreadMetadata.
 
- 
metadataForLocalThreadsReturns runtime information about the local threads of thisKafkaStreamsinstance.- Returns:
- the set of ThreadMetadata.
 
- 
allLocalStorePartitionLagsReturnsLagInfo, for all store partitions (active or standby) local to this Streams instance. Note that the values returned are just estimates and meant to be used for making soft decisions on whether the data in the store partition is fresh enough for querying. Note: Each invocation of this method issues a call to the Kafka brokers. Thus its advisable to limit the frequency of invocation to once every few seconds.- Returns:
- map of store names to another map of partition to LagInfos
- Throws:
- StreamsException- if the admin client request throws exception
 
- 
allLocalStorePartitionLags
- 
queryRun an interactive query against a state store.This method allows callers outside of the Streams runtime to access the internal state of stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html for more information. NOTICE: This functionality is InterfaceStability.Evolvingand subject to change in minor versions. Once it is stabilized, this notice and the evolving annotation will be removed.- Type Parameters:
- R- The result type specified by the query.
- Throws:
- StreamsNotStartedException- If Streams has not yet been started. Just call- start()and then retry this call.
- StreamsStoppedException- If Streams is in a terminal state like PENDING_SHUTDOWN, NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should discover a new instance to query.
- UnknownStateStoreException- If the specified store name does not exist in the topology.
 
 
- 
metadataForAllStreamsClients()