@Evolving public class KafkaStreams extends java.lang.Object
The computational logic can be specified either by using the Topology to define a DAG topology of
Processors or by using the StreamsBuilder which provides the high-level DSL to define
transformations.
One KafkaStreams instance can contain one or more threads specified in the configs for the processing work.
A KafkaStreams instance can co-ordinate with any other instances with the same
application ID (whether in the same process, on other processes on this
machine, or on remote machines) as a single (possibly distributed) stream processing application.
These instances will divide up the work based on the assignment of the input topic partitions so that all partitions
are being consumed.
If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves
to balance processing load and ensure that all input topic partitions are processed.
Internally a KafkaStreams instance contains a normal KafkaProducer and KafkaConsumer instance
that is used for reading input and writing output.
A simple example might look like this:
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig config = new StreamsConfig(props);
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
StreamsBuilder,
Topology| Modifier and Type | Class | Description |
|---|---|---|
static class |
KafkaStreams.State |
Kafka Streams states are the possible state that a Kafka Streams instance can be in.
|
static interface |
KafkaStreams.StateListener |
Listen to
KafkaStreams.State change events. |
| Constructor | Description |
|---|---|
KafkaStreams(TopologyBuilder builder,
java.util.Properties props) |
Deprecated.
use
KafkaStreams(Topology, Properties) instead |
KafkaStreams(TopologyBuilder builder,
StreamsConfig config) |
Deprecated.
use
KafkaStreams(Topology, StreamsConfig) instead |
KafkaStreams(TopologyBuilder builder,
StreamsConfig config,
KafkaClientSupplier clientSupplier) |
Deprecated.
|
KafkaStreams(Topology topology,
java.util.Properties props) |
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
StreamsConfig config) |
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
StreamsConfig config,
org.apache.kafka.common.utils.Time time) |
Create a
KafkaStreams instance. |
KafkaStreams(Topology topology,
StreamsConfig config,
KafkaClientSupplier clientSupplier) |
Create a
KafkaStreams instance. |
| Modifier and Type | Method | Description |
|---|---|---|
java.util.Collection<StreamsMetadata> |
allMetadata() |
Find all currently running
KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance. |
java.util.Collection<StreamsMetadata> |
allMetadataForStore(java.lang.String storeName) |
Find all currently running
KafkaStreams instances (potentially remotely) that
use the same application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)
and that contain a StateStore with the given storeName
and return StreamsMetadata for each discovered instance. |
void |
cleanUp() |
Do a clean up of the local
StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all
data with regard to the application ID. |
void |
close() |
Shutdown this
KafkaStreams instance by signaling all the threads to stop, and then wait for them to join. |
boolean |
close(long timeout,
java.util.concurrent.TimeUnit timeUnit) |
Shutdown this
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join. |
java.util.Set<ThreadMetadata> |
localThreadsMetadata() |
Returns runtime information about the local threads of this
KafkaStreams instance. |
<K> StreamsMetadata |
metadataForKey(java.lang.String storeName,
K key,
Serializer<K> keySerializer) |
Find the currently running
KafkaStreams instance (potentially remotely) that
use the same application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)
and that contain a StateStore with the given storeName
and the StateStore contains the given key
and return StreamsMetadata for it. |
<K> StreamsMetadata |
metadataForKey(java.lang.String storeName,
K key,
StreamPartitioner<? super K,?> partitioner) |
Find the currently running
KafkaStreams instance (potentially remotely) that
use the same application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)
and that contain a StateStore with the given storeName
and the StateStore contains the given key
and return StreamsMetadata for it. |
java.util.Map<MetricName,? extends Metric> |
metrics() |
Get read-only handle on global metrics registry.
|
void |
setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener) |
Set the listener which is triggered whenever a
StateStore is being restored in order to resume
processing. |
void |
setStateListener(KafkaStreams.StateListener listener) |
An app can set a single
KafkaStreams.StateListener so that the app is notified when state changes. |
void |
setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler eh) |
Set the handler invoked when a
internal thread abruptly
terminates due to an uncaught exception. |
void |
start() |
Start the
KafkaStreams instance by starting all its threads. |
KafkaStreams.State |
state() |
Return the current
KafkaStreams.State of this KafkaStreams instance. |
<T> T |
store(java.lang.String storeName,
QueryableStoreType<T> queryableStoreType) |
Get a facade wrapping the local
StateStore instances with the provided storeName if the Store's
type is accepted by the provided queryableStoreType. |
java.lang.String |
toString() |
Deprecated.
Use
localThreadsMetadata() to retrieve runtime information. |
java.lang.String |
toString(java.lang.String indent) |
Deprecated.
Use
localThreadsMetadata() to retrieve runtime information. |
@Deprecated public KafkaStreams(TopologyBuilder builder, java.util.Properties props)
KafkaStreams(Topology, Properties) instead@Deprecated public KafkaStreams(TopologyBuilder builder, StreamsConfig config)
KafkaStreams(Topology, StreamsConfig) instead@Deprecated public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier)
KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier) insteadpublic KafkaStreams(Topology topology, java.util.Properties props)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicprops - properties for StreamsConfigStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig config)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicconfig - the Kafka Streams configurationStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig config, KafkaClientSupplier clientSupplier)
KafkaStreams instance.
Note: even if you never call start() on a KafkaStreams instance,
you still must close() it to avoid resource leaks.
topology - the topology specifying the computational logicconfig - the Kafka Streams configurationclientSupplier - the Kafka clients supplier which provides underlying producer and consumer clients
for the new KafkaStreams instanceStreamsException - if any fatal error occurspublic KafkaStreams(Topology topology, StreamsConfig config, org.apache.kafka.common.utils.Time time)
KafkaStreams instance.topology - the topology specifying the computational logicconfig - the Kafka Streams configurationtime - Time implementation; cannot be nullStreamsException - if any fatal error occurspublic KafkaStreams.State state()
KafkaStreams.State of this KafkaStreams instance.public void setStateListener(KafkaStreams.StateListener listener)
KafkaStreams.StateListener so that the app is notified when state changes.listener - a new state listenerjava.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.public void setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler eh)
internal thread abruptly
terminates due to an uncaught exception.eh - the uncaught exception handler for all internal threads; null deletes the current handlerjava.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener)
StateStore is being restored in order to resume
processing.globalStateRestoreListener - The listener triggered when StateStore is being restored.java.lang.IllegalStateException - if this KafkaStreams instance is not in state CREATED.public java.util.Map<MetricName,? extends Metric> metrics()
public void start()
throws java.lang.IllegalStateException,
StreamsException
KafkaStreams instance by starting all its threads.
This function is expected to be called only once during the life cycle of the client.
Because threads are started in the background, this method does not block.
As a consequence, any fatal exception that happens during processing is by default only logged.
If you want to be notified about dying threads, you can
register an uncaught exception handler
before starting the KafkaStreams instance.
Note, for brokers with version 0.9.x or lower, the broker version cannot be checked.
There will be no error and the client will hang and retry to verify the broker version until it
times out.
java.lang.IllegalStateException - if process was already startedStreamsException - if the Kafka brokers have version 0.10.0.x or
if exactly-once is enabled for pre 0.11.0.x brokerspublic void close()
KafkaStreams instance by signaling all the threads to stop, and then wait for them to join.
This will block until all threads have stopped.public boolean close(long timeout,
java.util.concurrent.TimeUnit timeUnit)
KafkaStreams by signaling all the threads to stop, and then wait up to the timeout for the
threads to join.
A timeout of 0 means to wait forever.timeout - how long to wait for the threads to shutdowntimeUnit - unit of time used for timeouttrue if all threads were successfully stopped—false if the timeout was reached
before all threads stopped
Note that this method must not be called in the onChange callback of KafkaStreams.StateListener.@Deprecated public java.lang.String toString()
localThreadsMetadata() to retrieve runtime information.KafkaStream instance such as
thread IDs, task IDs, and a representation of the topology DAG including StateStores (cf.
Topology and StreamsBuilder).toString in class java.lang.Object@Deprecated public java.lang.String toString(java.lang.String indent)
localThreadsMetadata() to retrieve runtime information.KafkaStream instance such as
thread IDs, task IDs, and a representation of the topology DAG including StateStores (cf.
Topology and StreamsBuilder).indent - the top-level indent for each linepublic void cleanUp()
StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all
data with regard to the application ID.
May only be called either before this KafkaStreams instance is started or after the
instance is closed.
Calling this method triggers a restore of local StateStores on the next application start.
java.lang.IllegalStateException - if this KafkaStreams instance is currently runningStreamsException - if cleanup failedpublic java.util.Collection<StreamsMetadata> allMetadata()
KafkaStreams instances (potentially remotely) that use the same
application ID as this instance (i.e., all instances that belong to
the same Kafka Streams application) and return StreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
StreamsMetadata for each KafkaStreams instances of this applicationpublic java.util.Collection<StreamsMetadata> allMetadataForStore(java.lang.String storeName)
KafkaStreams instances (potentially remotely) that
application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore with the given storeNameStreamsMetadata for each discovered instance.
Note: this is a point in time view and it may change due to partition reassignment.
storeName - the storeName to find metadata forStreamsMetadata for each KafkaStreams instances with the provide storeName of
this applicationpublic <K> StreamsMetadata metadataForKey(java.lang.String storeName, K key, Serializer<K> keySerializer)
KafkaStreams instance (potentially remotely) that
application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore with the given storeNameStateStore contains the given keyStreamsMetadata for it.
This will use the default Kafka Streams partitioner to locate the partition.
If a custom partitioner has been
configured via StreamsConfig or
KStream.through(String, Produced), or if the original KTable's input
topic is partitioned differently, please use
metadataForKey(String, Object, StreamPartitioner).
Note:
StateStore; this method provides a way of finding which host it
would exist onK - key typestoreName - the storeName to find metadata forkey - the key to find metadata forkeySerializer - serializer for the keyStreamsMetadata for the KafkaStreams instance with the provide storeName and
key of this application or StreamsMetadata.NOT_AVAILABLE if Kafka Streams is (re-)initializingpublic <K> StreamsMetadata metadataForKey(java.lang.String storeName, K key, StreamPartitioner<? super K,?> partitioner)
KafkaStreams instance (potentially remotely) that
application ID as this instance (i.e., all
instances that belong to the same Kafka Streams application)StateStore with the given storeNameStateStore contains the given keyStreamsMetadata for it.
Note:
StateStore; this method provides a way of finding which host it
would exist onK - key typestoreName - the storeName to find metadata forkey - the key to find metadata forpartitioner - the partitioner to be use to locate the host for the keyStreamsMetadata for the KafkaStreams instance with the provide storeName and
key of this application or StreamsMetadata.NOT_AVAILABLE if Kafka Streams is (re-)initializingpublic <T> T store(java.lang.String storeName,
QueryableStoreType<T> queryableStoreType)
StateStore instances with the provided storeName if the Store's
type is accepted by the provided queryableStoreType.
The returned object can be used to query the StateStore instances.T - return typestoreName - name of the store to findqueryableStoreType - accept only stores that are accepted by QueryableStoreType.accepts(StateStore)StateStore instancesInvalidStateStoreException - if Kafka Streams is (re-)initializing or a store with storeName and
queryableStoreType doesnt' existpublic java.util.Set<ThreadMetadata> localThreadsMetadata()
KafkaStreams instance.ThreadMetadata.