@InterfaceStability.Unstable public class KafkaStreams extends Object
 The computational logic can be specified either by using the TopologyBuilder class to define the a DAG topology of
 Processors or by using the KStreamBuilder
 class which provides the high-level KStream DSL to define the transformation.
 
 The KafkaStreams class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
 more threads specified in the configs for the processing work.
 
 A KafkaStreams instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
 on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
 based on the assignment of the input topic partitions so that all partitions are being
 consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves
 to balance processing load.
 
 Internally the KafkaStreams instance contains a normal KafkaProducer
 and KafkaConsumer instance that is used for reading input and writing output.
 
A simple example might look like this:
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
 | Constructor and Description | 
|---|
| KafkaStreams(TopologyBuilder builder,
            Properties props)Construct the stream instance. | 
| KafkaStreams(TopologyBuilder builder,
            StreamsConfig config)Construct the stream instance. | 
| KafkaStreams(TopologyBuilder builder,
            StreamsConfig config,
            KafkaClientSupplier clientSupplier)Construct the stream instance. | 
| Modifier and Type | Method and Description | 
|---|---|
| Collection<StreamsMetadata> | allMetadata()Find all of the instances of  StreamsMetadatain theKafkaStreamsapplication that this instance belongs to
 Note: this is a point in time view and it may change due to partition reassignment. | 
| Collection<StreamsMetadata> | allMetadataForStore(String storeName)Find instances of  StreamsMetadatathat contains the given storeName
 Note: this is a point in time view and it may change due to partition reassignment. | 
| void | cleanUp()Cleans up local state store directory ( state.dir), by deleting all data with regard to the application-id. | 
| void | close()Shutdown this stream instance by signaling all the threads to stop,
 and then wait for them to join. | 
| <K> StreamsMetadata | metadataForKey(String storeName,
              K key,
              Serializer<K> keySerializer)Find the  StreamsMetadatainstance that contains the given storeName
 and the corresponding hosted store instance contains the given key. | 
| <K> StreamsMetadata | metadataForKey(String storeName,
              K key,
              StreamPartitioner<K,?> partitioner)Find the  StreamsMetadatainstance that contains the given storeName
 and the corresponding hosted store instance contains the given key
 Note: the key may not exist in theStateStore,
 this method provides a way of finding which host it would exist on. | 
| void | setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. | 
| void | start()Start the stream instance by starting all its threads. | 
| <T> T | store(String storeName,
     QueryableStoreType<T> queryableStoreType)Get a facade wrapping the  StateStoreinstances
 with the provided storeName and accepted byQueryableStoreType.accepts(StateStore). | 
| String | toString()Produces a string representation contain useful information about Kafka Streams
 Such as thread IDs, task IDs and a representation of the topology. | 
public KafkaStreams(TopologyBuilder builder, Properties props)
builder - the processor topology builder specifying the computational logicprops - properties for the StreamsConfigpublic KafkaStreams(TopologyBuilder builder, StreamsConfig config)
builder - the processor topology builder specifying the computational logicconfig - the stream configspublic KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier)
builder - the processor topology builder specifying the computational logicconfig - the stream configsclientSupplier - the kafka clients supplier which provides underlying producer and consumer clients
                       for this KafkaStreams instancepublic void start()
IllegalStateException - if process was already startedpublic void close()
IllegalStateException - if process has not started yetpublic String toString()
public void cleanUp()
state.dir), by deleting all data with regard to the application-id.
 May only be called either before instance is started or after instance is closed.
IllegalStateException - if instance is currently runningpublic void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)
eh - the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.public Collection<StreamsMetadata> allMetadata()
StreamsMetadata in the KafkaStreams application that this instance belongs to
 Note: this is a point in time view and it may change due to partition reassignment.StreamsMetadata in the KafkaStreams application that this instance belongs topublic Collection<StreamsMetadata> allMetadataForStore(String storeName)
StreamsMetadata that contains the given storeName
 Note: this is a point in time view and it may change due to partition reassignment.storeName - the storeName to find metadata forStreamsMetadata that have the provided storeNamepublic <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer)
StreamsMetadata instance that contains the given storeName
 and the corresponding hosted store instance contains the given key. This will use
 the org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner to
 locate the partition. If a custom partitioner has been used please use
 metadataForKey(String, Object, StreamPartitioner)
 Note: the key may not exist in the StateStore,
 this method provides a way of finding which host it would exist on.
 Note: this is a point in time view and it may change due to partition reassignment.K - key typestoreName - Name of the storekey - Key to use to for partitionkeySerializer - Serializer for the keyStreamsMetadata for the storeName and key or StreamsMetadata.NOT_AVAILABLE
 if streams is (re-)initializingpublic <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<K,?> partitioner)
StreamsMetadata instance that contains the given storeName
 and the corresponding hosted store instance contains the given key
 Note: the key may not exist in the StateStore,
 this method provides a way of finding which host it would exist on.
 Note: this is a point in time view and it may change due to partition reassignment.K - key typestoreName - Name of the storekey - Key to use to for partitionpartitioner - Partitioner for the storeStreamsMetadata for the storeName and key or StreamsMetadata.NOT_AVAILABLE
 if streams is (re-)initializingpublic <T> T store(String storeName, QueryableStoreType<T> queryableStoreType)
StateStore instances
 with the provided storeName and accepted by QueryableStoreType.accepts(StateStore).
 The returned object can be used to query the StateStore instancesT - return typestoreName - name of the store to findqueryableStoreType - accept only stores that are accepted by QueryableStoreType.accepts(StateStore)StateStore instancesInvalidStateStoreException - if the streams are (re-)initializing or
 a store with storeName and queryableStoreType doesnt' exist.