Kafka Streams Configs
Below is the Kafka Streams client library configuration.
application.id
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
Type: string Default: Valid Values: Importance: high bootstrap.servers
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. While the order of servers in the list does not matter, we recommend including more than one server to ensure resilience if any servers are down. This list does not need to contain the entire set of brokers, as Kafka clients automatically manage and update connections to the cluster efficiently. This list must be in the form
host1:port1,host2:port2,....Type: list Default: Valid Values: Importance: high ensure.explicit.internal.resource.naming
Whether to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores. When enabled, the application will refuse to start if any internal resource has an auto-generated name.
Type: boolean Default: false Valid Values: Importance: high num.standby.replicas
The number of standby replicas for each task.
Type: int Default: 0 Valid Values: Importance: high state.dir
Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem. Note that if not configured, then the default location will be different in each environment as it is computed using System.getProperty("java.io.tmpdir")
Type: string Default: ${java.io.tmpdir} Valid Values: Importance: high acceptable.recovery.lag
The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment. Upon assignment, it will still restore the rest of the changelog before processing. To avoid a pause in processing during rebalances, this config should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
Type: long Default: 10000 Valid Values: [0,...] Importance: medium cache.max.bytes.buffering
Maximum number of memory bytes to be used for buffering across all threads
Type: long Default: 10485760 Valid Values: [0,...] Importance: medium client.id
An ID prefix string used for the client IDs of internal (main, restore, and global) consumers , producers, and admin clients with pattern
<client.id>-[Global]StreamThread[-<threadSequenceNumber>]-<consumer|producer|restore-consumer|global-consumer>.Type: string Default: <application.id>-<random-UUID>Valid Values: Importance: medium default.deserialization.exception.handler
Exception handling class that implements the
org.apache.kafka.streams.errors.DeserializationExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler Valid Values: Importance: medium default.key.serde
Default serializer / deserializer class for key that implements the
org.apache.kafka.common.serialization.Serdeinterface.Type: class Default: null Valid Values: Importance: medium default.list.key.serde.inner
Default inner class of list serde for key that implements the
org.apache.kafka.common.serialization.Serdeinterface. This configuration will be read if and only ifdefault.key.serdeconfiguration is set toorg.apache.kafka.common.serialization.Serdes.ListSerdeType: class Default: null Valid Values: Importance: medium default.list.key.serde.type
Default class for key that implements the
java.util.Listinterface. This configuration will be read if and only ifdefault.key.serdeconfiguration is set toorg.apache.kafka.common.serialization.Serdes.ListSerdeNote when list serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serdeinterface via 'default.list.key.serde.inner'Type: class Default: null Valid Values: Importance: medium default.list.value.serde.inner
Default inner class of list serde for value that implements the
org.apache.kafka.common.serialization.Serdeinterface. This configuration will be read if and only ifdefault.value.serdeconfiguration is set toorg.apache.kafka.common.serialization.Serdes.ListSerdeType: class Default: null Valid Values: Importance: medium default.list.value.serde.type
Default class for value that implements the
java.util.Listinterface. This configuration will be read if and only ifdefault.value.serdeconfiguration is set toorg.apache.kafka.common.serialization.Serdes.ListSerdeNote when list serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serdeinterface via 'default.list.value.serde.inner'Type: class Default: null Valid Values: Importance: medium default.production.exception.handler
Exception handling class that implements the
org.apache.kafka.streams.errors.ProductionExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler Valid Values: Importance: medium default.timestamp.extractor
Default timestamp extractor class that implements the
org.apache.kafka.streams.processor.TimestampExtractorinterface.Type: class Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp Valid Values: Importance: medium default.value.serde
Default serializer / deserializer class for value that implements the
org.apache.kafka.common.serialization.Serdeinterface.Type: class Default: null Valid Values: Importance: medium deserialization.exception.handler
Exception handling class that implements the
org.apache.kafka.streams.errors.DeserializationExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler Valid Values: Importance: medium group.protocol
The group protocol streams should use. We currently support "classic" or "streams". If "streams" is specified, then the streams rebalance protocol will be used. Otherwise, the classic group protocol will be used.
Type: string Default: classic Valid Values: (case insensitive) [STREAMS, CLASSIC] Importance: medium max.task.idle.ms
This config controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams. The default (zero) does not wait for producers to send more records, but it does wait to fetch data that is already present on the brokers. This default means that for records that are already present on the brokers, Streams will process them in timestamp order. Set to -1 to disable idling entirely and process any locally available data, even though doing so may produce out-of-order processing.
Type: long Default: 0 Valid Values: Importance: medium max.warmup.replicas
The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker traffic and cluster state can be used for high availability. Must be at least 1.Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active task during a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the `probing.rebalance.interval.ms` config). This means that the maximum rate at which active tasks can be migrated from one Kafka Streams Instance to another instance can be determined by (`max.warmup.replicas` / `probing.rebalance.interval.ms`).
Type: int Default: 2 Valid Values: [1,...] Importance: medium num.stream.threads
The number of threads to execute stream processing.
Type: int Default: 1 Valid Values: Importance: medium processing.exception.handler
Exception handling class that implements the
org.apache.kafka.streams.errors.ProcessingExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler Valid Values: Importance: medium processing.guarantee
The processing guarantee that should be used. Possible values are
at_least_once(default) andexactly_once_v2(requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker settingtransaction.state.log.replication.factorandtransaction.state.log.min.isr.Type: string Default: at_least_once Valid Values: [at_least_once, exactly_once_v2] Importance: medium production.exception.handler
Exception handling class that implements the
org.apache.kafka.streams.errors.ProductionExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler Valid Values: Importance: medium replication.factor
The replication factor for change log topics and repartition topics created by the stream processing application. The default of
-1(meaning: use broker default replication factor) requires broker version 2.4 or newerType: int Default: -1 Valid Values: Importance: medium security.protocol
Protocol used to communicate with brokers.
Type: string Default: PLAINTEXT Valid Values: (case insensitive) [SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT] Importance: medium statestore.cache.max.bytes
Maximum number of memory bytes to be used for statestore cache across all threads
Type: long Default: 10485760 (10 mebibytes) Valid Values: [0,...] Importance: medium task.assignor.class
A task assignor class or class name implementing the
org.apache.kafka.streams.processor.assignment.TaskAssignorinterface. Defaults to theHighAvailabilityTaskAssignorclass.Type: string Default: null Valid Values: Importance: medium task.timeout.ms
The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task would raise an error for the first internal error. For any timeout larger than 0ms, a task will retry at least once before an error is raised.
Type: long Default: 300000 (5 minutes) Valid Values: [0,...] Importance: medium topology.optimization
A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: "+NO_OPTIMIZATION+", "+OPTIMIZE+", or a comma separated list of specific optimizations: ("+REUSE_KTABLE_SOURCE_TOPICS+", "+MERGE_REPARTITION_TOPICS+" + "SINGLE_STORE_SELF_JOIN+")."NO_OPTIMIZATION" by default.
Type: string Default: none Valid Values: [all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance: medium application.server
A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.
Type: string Default: "" Valid Values: Importance: low buffered.records.per.partition
Maximum number of records to buffer per partition.
Type: int Default: 1000 Valid Values: Importance: low built.in.metrics.version
Version of the built-in metrics to use.
Type: string Default: latest Valid Values: [latest] Importance: low commit.interval.ms
The frequency in milliseconds with which to commit processing progress. For at-least-once processing, committing means to save the position (ie, offsets) of the processor. For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed. (Note, if
processing.guaranteeis set toexactly_once_v2, the default value is100, otherwise the default value is30000.Type: long Default: 30000 (30 seconds) Valid Values: [0,...] Importance: low connections.max.idle.ms
Close idle connections after the number of milliseconds specified by this config.
Type: long Default: 540000 (9 minutes) Valid Values: Importance: low default.client.supplier
Client supplier class that implements the
org.apache.kafka.streams.KafkaClientSupplierinterface.Type: class Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier Valid Values: Importance: low default.dsl.store
The default state store type used by DSL operators.
Type: string Default: rocksDB Valid Values: [rocksDB, in_memory] Importance: low dsl.store.suppliers.class
Defines which store implementations to plug in to DSL operators. Must implement the
org.apache.kafka.streams.state.DslStoreSuppliersinterface.Type: class Default: org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers Valid Values: Importance: low enable.metrics.push
Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients. The cluster must have a client metrics subscription which corresponds to a client.
Type: boolean Default: true Valid Values: Importance: low log.summary.interval.ms
The output interval in milliseconds for logging summary information.
If greater or equal to 0, the summary log will be output according to the set time interval;
If less than 0, summary output is disabled.Type: long Default: 120000 (2 minutes) Valid Values: Importance: low metadata.max.age.ms
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
Type: long Default: 300000 (5 minutes) Valid Values: [0,...] Importance: low metadata.recovery.rebootstrap.trigger.ms
If a client configured to rebootstrap using
metadata.recovery.strategy=rebootstrapis unable to obtain metadata from any of the brokers in the last known metadata for this interval, client repeats the bootstrap process usingbootstrap.serversconfiguration.Type: long Default: 300000 (5 minutes) Valid Values: [0,...] Importance: low metadata.recovery.strategy
Controls how the client recovers when none of the brokers known to it is available. If set to
none, the client fails. If set torebootstrap, the client repeats the bootstrap process usingbootstrap.servers. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. Brokers appear unavailable when disconnected and no current retry attempt is in-progress. Consider increasingreconnect.backoff.msandreconnect.backoff.max.msand decreasingsocket.connection.setup.timeout.msandsocket.connection.setup.timeout.max.msfor the client. Rebootstrap is also triggered if connection cannot be established to any of the brokers formetadata.recovery.rebootstrap.trigger.msmilliseconds or if server requests rebootstrap.Type: string Default: rebootstrap Valid Values: (case insensitive) [REBOOTSTRAP, NONE] Importance: low metric.reporters
A list of classes to use as metrics reporters. Implementing the
org.apache.kafka.common.metrics.MetricsReporterinterface allows plugging in classes that will be notified of new metric creation. When custom reporters are set andorg.apache.kafka.common.metrics.JmxReporteris needed, it has to be explicitly added to the list.Type: list Default: org.apache.kafka.common.metrics.JmxReporter Valid Values: Importance: low metrics.num.samples
The number of samples maintained to compute metrics.
Type: int Default: 2 Valid Values: [1,...] Importance: low metrics.recording.level
The highest recording level for metrics. It has three levels for recording metrics - info, debug, and trace.
INFO level records only essential metrics necessary for monitoring system performance and health. It collects vital data without gathering too much detail, making it suitable for production environments where minimal overhead is desired.
DEBUG level records most metrics, providing more detailed information about the system's operation. It's useful for development and testing environments where you need deeper insights to debug and fine-tune the application.
TRACE level records all possible metrics, capturing every detail about the system's performance and operation. It's best for controlled environments where in-depth analysis is required, though it can introduce significant overhead.Type: string Default: INFO Valid Values: [INFO, DEBUG, TRACE] Importance: low metrics.sample.window.ms
The window of time a metrics sample is computed over.
Type: long Default: 30000 (30 seconds) Valid Values: [0,...] Importance: low poll.ms
The amount of time in milliseconds to block waiting for input.
Type: long Default: 100 Valid Values: Importance: low probing.rebalance.interval.ms
The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.
Type: long Default: 600000 (10 minutes) Valid Values: [60000,...] Importance: low processor.wrapper.class
A processor wrapper class or class name that implements the
org.apache.kafka.streams.state.ProcessorWrapperinterface. Must be passed in to the StreamsBuilder or Topology constructor in order to take effectType: class Default: org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper Valid Values: Importance: low rack.aware.assignment.non_overlap_cost
Cost associated with moving tasks from existing assignment. This config and
rack.aware.assignment.traffic_costcontrols whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger valueorg.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignorwill optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.Type: int Default: null Valid Values: Importance: low rack.aware.assignment.strategy
The strategy we use for rack aware assignment. Rack aware assignment will take
client.rackandracksofTopicPartitioninto account when assigning tasks to minimize cross rack traffic. Valid settings are :none(default), which will disable rack aware assignment;min_traffic, which will compute minimum cross rack traffic assignment;balance_subtopology, which will compute minimum cross rack traffic and try to balance the tasks of same subtopologies across different clientsType: string Default: none Valid Values: [none, min_traffic, balance_subtopology] Importance: low rack.aware.assignment.tags
List of client tag keys used to distribute standby replicas across Kafka Streams instances. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over each client tag dimension.
Type: list Default: "" Valid Values: List containing maximum of 5 elements Importance: low rack.aware.assignment.traffic_cost
Cost associated with cross rack traffic. This config and
rack.aware.assignment.non_overlap_costcontrols whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger valueorg.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignorwill optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.Type: int Default: null Valid Values: Importance: low receive.buffer.bytes
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
Type: int Default: 32768 (32 kibibytes) Valid Values: [-1,...] Importance: low reconnect.backoff.max.ms
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
Type: long Default: 1000 (1 second) Valid Values: [0,...] Importance: low reconnect.backoff.ms
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the
reconnect.backoff.max.msvalue.Type: long Default: 50 Valid Values: [0,...] Importance: low repartition.purge.interval.ms
The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging will occur after at least this value since the last purge, but may be delayed until later. (Note, unlike
commit.interval.ms, the default for this value remains unchanged whenprocessing.guaranteeis set toexactly_once_v2).Type: long Default: 30000 (30 seconds) Valid Values: [0,...] Importance: low request.timeout.ms
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Type: int Default: 40000 (40 seconds) Valid Values: [0,...] Importance: low retry.backoff.ms
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, up to the
retry.backoff.max.msvalue.Type: long Default: 100 Valid Values: [0,...] Importance: low rocksdb.config.setter
A Rocks DB config setter class or class name that implements the
org.apache.kafka.streams.state.RocksDBConfigSetterinterfaceType: class Default: null Valid Values: Importance: low send.buffer.bytes
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
Type: int Default: 131072 (128 kibibytes) Valid Values: [-1,...] Importance: low state.cleanup.delay.ms
The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least
state.cleanup.delay.mswill be removedType: long Default: 600000 (10 minutes) Valid Values: Importance: low upgrade.from
Allows live upgrading (and downgrading in some cases -- see upgrade guide) in a backward compatible way. Default is `null`. Please refer to the Kafka Streams upgrade guide for instructions on how and when to use this config. Note that when upgrading from 3.5 to a newer version it is never required to specify this config, while upgrading live directly to 4.0+ from 2.3 or below is no longer supported even with this config. Accepted values are "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.5", "3.6", "3.7", "3.8", "3.9", "(for upgrading from the corresponding old version).
Type: string Default: null Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9, 4.0] Importance: low window.size.ms
Sets window size for the deserializer in order to calculate window end times.
Type: long Default: null Valid Values: Importance: low windowed.inner.class.serde
Default serializer / deserializer for the inner class of a windowed record. Must implement the
org.apache.kafka.common.serialization.Serdeinterface. Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client.Type: string Default: null Valid Values: Importance: low windowstore.changelog.additional.retention.ms
Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day
Type: long Default: 86400000 (1 day) Valid Values: Importance: low