Class KafkaStreams

java.lang.Object
org.apache.kafka.streams.KafkaStreams
All Implemented Interfaces:
AutoCloseable

public class KafkaStreams extends Object implements AutoCloseable
A Kafka client that allows for performing continuous computation on input coming from one or more input topics and sends output to zero, one, or more output topics.

The computational logic can be specified either by using the Topology to define a DAG topology of Processors or by using the StreamsBuilder which provides the high-level DSL to define transformations.

One KafkaStreams instance can contain one or more threads specified in the configs for the processing work.

A KafkaStreams instance can co-ordinate with any other instances with the same application ID (whether in the same process, on other processes on this machine, or on remote machines) as a single (possibly distributed) stream processing application. These instances will divide up the work based on the assignment of the input topic partitions so that all partitions are being consumed. If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves to balance processing load and ensure that all input topic partitions are processed.

Internally a KafkaStreams instance contains a normal KafkaProducer and KafkaConsumer instance that is used for reading input and writing output.

A simple example might look like this:


 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

 StreamsBuilder builder = new StreamsBuilder();
 builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic");

 KafkaStreams streams = new KafkaStreams(builder.build(), props);
 streams.start();
 
See Also:
  • Field Details

    • clientId

      protected final String clientId
    • applicationConfigs

      protected final StreamsConfig applicationConfigs
    • threads

      protected final List<org.apache.kafka.streams.processor.internals.StreamThread> threads
    • stateDirectory

      protected final org.apache.kafka.streams.processor.internals.StateDirectory stateDirectory
    • streamsMetadataState

      protected final org.apache.kafka.streams.processor.internals.StreamsMetadataState streamsMetadataState
    • adminClient

      protected final Admin adminClient
    • topologyMetadata

      protected final org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata
    • state

      protected volatile KafkaStreams.State state
  • Constructor Details

    • KafkaStreams

      public KafkaStreams(Topology topology, Properties props)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      props - properties for StreamsConfig
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      props - properties for StreamsConfig
      clientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients for the new KafkaStreams instance
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, Properties props, org.apache.kafka.common.utils.Time time)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      props - properties for StreamsConfig
      time - Time implementation; cannot be null
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, org.apache.kafka.common.utils.Time time)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      props - properties for StreamsConfig
      clientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients for the new KafkaStreams instance
      time - Time implementation; cannot be null
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, StreamsConfig applicationConfigs)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      applicationConfigs - configs for Kafka Streams
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      applicationConfigs - configs for Kafka Streams
      clientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients for the new KafkaStreams instance
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      public KafkaStreams(Topology topology, StreamsConfig applicationConfigs, org.apache.kafka.common.utils.Time time)
      Create a KafkaStreams instance.

      Note: even if you never call start() on a KafkaStreams instance, you still must close() it to avoid resource leaks.

      Parameters:
      topology - the topology specifying the computational logic
      applicationConfigs - configs for Kafka Streams
      time - Time implementation; cannot be null
      Throws:
      StreamsException - if any fatal error occurs
    • KafkaStreams

      protected KafkaStreams(org.apache.kafka.streams.processor.internals.TopologyMetadata topologyMetadata, StreamsConfig applicationConfigs, KafkaClientSupplier clientSupplier) throws StreamsException
      Throws:
      StreamsException
  • Method Details

    • state

      public KafkaStreams.State state()
      Return the current KafkaStreams.State of this KafkaStreams instance.
      Returns:
      the current state of this Kafka Streams instance
    • isRunningOrRebalancing

      protected boolean isRunningOrRebalancing()
    • hasStartedOrFinishedShuttingDown

      protected boolean hasStartedOrFinishedShuttingDown()
    • validateIsRunningOrRebalancing

      protected void validateIsRunningOrRebalancing()
    • setStateListener

      public void setStateListener(KafkaStreams.StateListener listener)
      An app can set a single KafkaStreams.StateListener so that the app is notified when state changes.
      Parameters:
      listener - a new state listener
      Throws:
      IllegalStateException - if this KafkaStreams instance has already been started.
    • setUncaughtExceptionHandler

      @Deprecated public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
      Set the handler invoked when an internal stream thread abruptly terminates due to an uncaught exception.
      Parameters:
      uncaughtExceptionHandler - the uncaught exception handler for all internal threads; null deletes the current handler
      Throws:
      IllegalStateException - if this KafkaStreams instance has already been started.
    • setUncaughtExceptionHandler

      public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler userStreamsUncaughtExceptionHandler)
      Set the handler invoked when an internal stream thread throws an unexpected exception. These might be exceptions indicating rare bugs in Kafka Streams, or they might be exceptions thrown by your code, for example a NullPointerException thrown from your processor logic. The handler will execute on the thread that produced the exception. In order to get the thread that threw the exception, use Thread.currentThread().

      Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any thread that encounters such an exception.

      Parameters:
      userStreamsUncaughtExceptionHandler - the uncaught exception handler of type StreamsUncaughtExceptionHandler for all internal threads
      Throws:
      IllegalStateException - if this KafkaStreams instance has already been started.
      NullPointerException - if userStreamsUncaughtExceptionHandler is null.
    • setGlobalStateRestoreListener

      public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
      Set the listener which is triggered whenever a StateStore is being restored in order to resume processing.
      Parameters:
      globalStateRestoreListener - The listener triggered when StateStore is being restored.
      Throws:
      IllegalStateException - if this KafkaStreams instance has already been started.
    • metrics

      public Map<MetricName,? extends Metric> metrics()
      Get read-only handle on global metrics registry, including streams client's own metrics plus its embedded producer, consumer and admin clients' metrics.
      Returns:
      Map of all metrics.
    • addStreamThread

      public Optional<String> addStreamThread()
      Adds and starts a stream thread in addition to the stream threads that are already running in this Kafka Streams client.

      Since the number of stream threads increases, the sizes of the caches in the new stream thread and the existing stream threads are adapted so that the sum of the cache sizes over all stream threads does not exceed the total cache size specified in configuration StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.

      Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.

      Returns:
      name of the added stream thread or empty if a new stream thread could not be added
    • removeStreamThread

      public Optional<String> removeStreamThread()
      Removes one stream thread out of the running stream threads from this Kafka Streams client.

      The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.

      Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.

      Returns:
      name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
    • removeStreamThread

      public Optional<String> removeStreamThread(Duration timeout)
      Removes one stream thread out of the running stream threads from this Kafka Streams client.

      The removed stream thread is gracefully shut down. This method does not specify which stream thread is shut down.

      Since the number of stream threads decreases, the sizes of the caches in the remaining stream threads are adapted so that the sum of the cache sizes over all stream threads equals the total cache size specified in configuration StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG.

      Parameters:
      timeout - The length of time to wait for the thread to shutdown
      Returns:
      name of the removed stream thread or empty if a stream thread could not be removed because no stream threads are alive
      Throws:
      TimeoutException - if the thread does not stop in time
    • start

      public void start() throws IllegalStateException, StreamsException
      Start the KafkaStreams instance by starting all its threads. This function is expected to be called only once during the life cycle of the client.

      Because threads are started in the background, this method does not block. However, if you have global stores in your topology, this method blocks until all global stores are restored. As a consequence, any fatal exception that happens during processing is by default only logged. If you want to be notified about dying threads, you can register an uncaught exception handler before starting the KafkaStreams instance.

      Note, for brokers with version 0.9.x or lower, the broker version cannot be checked. There will be no error and the client will hang and retry to verify the broker version until it times out.

      Throws:
      IllegalStateException - if process was already started
      StreamsException - if the Kafka brokers have version 0.10.0.x or if exactly-once is enabled for pre 0.11.0.x brokers
    • close

      public void close()
      Shutdown this KafkaStreams instance by signaling all the threads to stop, and then wait for them to join. This will block until all threads have stopped.
      Specified by:
      close in interface AutoCloseable
    • close

      public boolean close(Duration timeout) throws IllegalArgumentException
      Shutdown this KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the threads to join. A timeout of Duration.ZERO (or any other zero duration) makes the close operation asynchronous. Negative-duration timeouts are rejected.
      Parameters:
      timeout - how long to wait for the threads to shutdown
      Returns:
      true if all threads were successfully stopped—false if the timeout was reached before all threads stopped Note that this method must not be called in the KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State) callback of KafkaStreams.StateListener.
      Throws:
      IllegalArgumentException - if timeout can't be represented as long milliseconds
    • close

      public boolean close(KafkaStreams.CloseOptions options) throws IllegalArgumentException
      Shutdown this KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the threads to join.
      Parameters:
      options - contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to trigger consumer leave call
      Returns:
      true if all threads were successfully stopped—false if the timeout was reached before all threads stopped Note that this method must not be called in the KafkaStreams.StateListener.onChange(KafkaStreams.State, KafkaStreams.State) callback of KafkaStreams.StateListener.
      Throws:
      IllegalArgumentException - if timeout can't be represented as long milliseconds
    • cleanUp

      public void cleanUp()
      Do a clean up of the local StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all data with regard to the application ID.

      May only be called either before this KafkaStreams instance is started or after the instance is closed.

      Calling this method triggers a restore of local StateStores on the next application start.

      Throws:
      IllegalStateException - if this KafkaStreams instance has been started and hasn't fully shut down
      StreamsException - if cleanup failed
    • allMetadata

      @Deprecated public Collection<StreamsMetadata> allMetadata()
      Deprecated.
      Find all currently running KafkaStreams instances (potentially remotely) that use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application) and return StreamsMetadata for each discovered instance.

      Note: this is a point in time view and it may change due to partition reassignment.

      Returns:
      StreamsMetadata for each KafkaStreams instances of this application
    • metadataForAllStreamsClients

      public Collection<StreamsMetadata> metadataForAllStreamsClients()
      Find all currently running KafkaStreams instances (potentially remotely) that use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application) and return StreamsMetadata for each discovered instance.

      Note: this is a point in time view and it may change due to partition reassignment.

      Returns:
      StreamsMetadata for each KafkaStreams instances of this application
    • allMetadataForStore

      @Deprecated public Collection<StreamsMetadata> allMetadataForStore(String storeName)
      Deprecated.
      Find all currently running KafkaStreams instances (potentially remotely) that
      • use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application)
      • and that contain a StateStore with the given storeName
      and return StreamsMetadata for each discovered instance.

      Note: this is a point in time view and it may change due to partition reassignment.

      Parameters:
      storeName - the storeName to find metadata for
      Returns:
      StreamsMetadata for each KafkaStreams instances with the provide storeName of this application
    • streamsMetadataForStore

      public Collection<StreamsMetadata> streamsMetadataForStore(String storeName)
      Find all currently running KafkaStreams instances (potentially remotely) that
      • use the same application ID as this instance (i.e., all instances that belong to the same Kafka Streams application)
      • and that contain a StateStore with the given storeName
      and return StreamsMetadata for each discovered instance.

      Note: this is a point in time view and it may change due to partition reassignment.

      Parameters:
      storeName - the storeName to find metadata for
      Returns:
      StreamsMetadata for each KafkaStreams instances with the provide storeName of this application
    • queryMetadataForKey

      public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer)
      Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
      Type Parameters:
      K - key type Returns KeyQueryMetadata containing all metadata about hosting the given key for the given store, or null if no matching metadata could be found.
      Parameters:
      storeName - the storeName to find metadata for
      key - the key to find metadata for
      keySerializer - serializer for the key
    • queryMetadataForKey

      public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K,?> partitioner)
      Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
      Type Parameters:
      K - key type Returns KeyQueryMetadata containing all metadata about hosting the given key for the given store, using the the supplied partitioner, or null if no matching metadata could be found.
      Parameters:
      storeName - the storeName to find metadata for
      key - the key to find metadata for
      partitioner - the partitioner to be use to locate the host for the key
    • store

      public <T> T store(StoreQueryParameters<T> storeQueryParameters)
      Get a facade wrapping the local StateStore instances with the provided StoreQueryParameters. The returned object can be used to query the StateStore instances.
      Parameters:
      storeQueryParameters - the parameters used to fetch a queryable store
      Returns:
      A facade wrapping the local StateStore instances
      Throws:
      StreamsNotStartedException - If Streams has not yet been started. Just call start() and then retry this call.
      UnknownStateStoreException - If the specified store name does not exist in the topology.
      InvalidStateStorePartitionException - If the specified partition does not exist.
      InvalidStateStoreException - If the Streams instance isn't in a queryable state. If the store's type does not match the QueryableStoreType, the Streams instance is not in a queryable state with respect to the parameters, or if the store is not available locally, then an InvalidStateStoreException is thrown upon store access.
    • pause

      public void pause()
      This method pauses processing for the KafkaStreams instance. Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. Notably, paused topologies will still poll Kafka consumers, and commit offsets. This method sets transient state that is not maintained or managed among instances. Note that pause() can be called before start() in order to start a KafkaStreams instance in a manner where the processing is paused as described, but the consumers are started up.
    • isPaused

      public boolean isPaused()
      Returns:
      true when the KafkaStreams instance has its processing paused.
    • resume

      public void resume()
      This method resumes processing for the KafkaStreams instance.
    • processStreamThread

      protected int processStreamThread(Consumer<org.apache.kafka.streams.processor.internals.StreamThread> consumer)
      handle each stream thread in a snapshot of threads. noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding threads lock when looping threads.
      Parameters:
      consumer - handler
    • localThreadsMetadata

      @Deprecated public Set<ThreadMetadata> localThreadsMetadata()
      Deprecated.
      Returns runtime information about the local threads of this KafkaStreams instance.
      Returns:
      the set of ThreadMetadata.
    • metadataForLocalThreads

      public Set<ThreadMetadata> metadataForLocalThreads()
      Returns runtime information about the local threads of this KafkaStreams instance.
      Returns:
      the set of ThreadMetadata.
    • allLocalStorePartitionLags

      public Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags()
      Returns LagInfo, for all store partitions (active or standby) local to this Streams instance. Note that the values returned are just estimates and meant to be used for making soft decisions on whether the data in the store partition is fresh enough for querying. Note: Each invocation of this method issues a call to the Kafka brokers. Thus its advisable to limit the frequency of invocation to once every few seconds.
      Returns:
      map of store names to another map of partition to LagInfos
      Throws:
      StreamsException - if the admin client request throws exception
    • allLocalStorePartitionLags

      protected Map<String,Map<Integer,LagInfo>> allLocalStorePartitionLags(List<org.apache.kafka.streams.processor.internals.Task> tasksToCollectLagFor)
    • query

      @Evolving public <R> StateQueryResult<R> query(StateQueryRequest<R> request)
      Run an interactive query against a state store.

      This method allows callers outside of the Streams runtime to access the internal state of stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html for more information.

      NOTICE: This functionality is InterfaceStability.Evolving and subject to change in minor versions. Once it is stabilized, this notice and the evolving annotation will be removed.

      Type Parameters:
      R - The result type specified by the query.
      Throws:
      StreamsNotStartedException - If Streams has not yet been started. Just call start() and then retry this call.
      StreamsStoppedException - If Streams is in a terminal state like PENDING_SHUTDOWN, NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should discover a new instance to query.
      UnknownStateStoreException - If the specified store name does not exist in the topology.