public class StreamsConfig extends AbstractConfig
KafkaStreams instance.
 Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin.
 To avoid consumer/producer/admin property conflicts, you should prefix those properties using
 consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.
 Example:
 // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
 Properties streamsProperties = new Properties();
 streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
 // or
 streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
 // suggested:
 Properties streamsProperties = new Properties();
 // sets "metadata.max.age.ms" to 1 minute for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
 // sets "metadata.max.age.ms" to 1 minute for producer only
 streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
 Properties streamsProperties = new Properties();
 // sets "my.custom.config" to "foo" for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
 // sets "my.custom.config" to "bar" for producer only
 streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
 // sets "my.custom.config2" to "boom" for all clients universally
 streamsProperties.put("my.custom.config2", "boom");
 // as a result, inside producer's serde class configure(..) function,
 // users can now read both key-value pairs "my.custom.config" -> "foo"
 // and "my.custom.config2" -> "boom" from the config map
 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also
 increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
      max.poll.interval.ms > max.block.ms"group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit" (false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor"processing.guarantee" is set to "exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
 "isolation.level" (read_committed) - Consumers will always read committed data only"enable.idempotence" (true) - Producer will always have idempotency enabled| Modifier and Type | Class and Description | 
|---|---|
| static class  | StreamsConfig.InternalConfig | 
| Modifier and Type | Field and Description | 
|---|---|
| static String | ACCEPTABLE_RECOVERY_LAG_CONFIGacceptable.recovery.lag | 
| static String | ADMIN_CLIENT_PREFIXPrefix used to isolate  adminconfigs from other client configs. | 
| static String | APPLICATION_ID_CONFIGapplication.id | 
| static String | APPLICATION_SERVER_CONFIGapplication.server | 
| static String | AT_LEAST_ONCEConfig value for parameter  "processing.guarantee"for at-least-once processing guarantees. | 
| static String | BOOTSTRAP_SERVERS_CONFIGbootstrap.servers | 
| static String | BUFFERED_RECORDS_PER_PARTITION_CONFIGbuffered.records.per.partition | 
| static String | BUILT_IN_METRICS_VERSION_CONFIGbuilt.in.metrics.version | 
| static String | CACHE_MAX_BYTES_BUFFERING_CONFIGcache.max.bytes.buffering | 
| static String | CLIENT_ID_CONFIGclient.id | 
| static String | COMMIT_INTERVAL_MS_CONFIGcommit.interval.ms | 
| static String | CONNECTIONS_MAX_IDLE_MS_CONFIGconnections.max.idle.ms | 
| static String | CONSUMER_PREFIXPrefix used to isolate  consumerconfigs from other client configs. | 
| static String | DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIGdefault.deserialization.exception.handler | 
| static String | DEFAULT_KEY_SERDE_CLASS_CONFIGdefault key.serde | 
| static String | DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIGdefault.production.exception.handler | 
| static String | DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIGdefault.timestamp.extractor | 
| static String | DEFAULT_VALUE_SERDE_CLASS_CONFIGdefault value.serde | 
| static String | DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASSdefault.windowed.key.serde.inner | 
| static String | DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASSdefault.windowed.value.serde.inner | 
| static int | DUMMY_THREAD_INDEX | 
| static String | EXACTLY_ONCEConfig value for parameter  "processing.guarantee"for exactly-once processing guarantees. | 
| static String | EXACTLY_ONCE_BETAConfig value for parameter  "processing.guarantee"for exactly-once processing guarantees. | 
| static String | GLOBAL_CONSUMER_PREFIXPrefix used to override  consumerconfigs for the global consumer client from
 the general consumer client configs. | 
| static String | MAIN_CONSUMER_PREFIXPrefix used to override  consumerconfigs for the main consumer client from
 the general consumer client configs. | 
| static String | MAX_TASK_IDLE_MS_CONFIGmax.task.idle.ms | 
| static String | MAX_WARMUP_REPLICAS_CONFIGmax.warmup.replicas | 
| static String | METADATA_MAX_AGE_CONFIGmetadata.max.age.ms | 
| static String | METRIC_REPORTER_CLASSES_CONFIGmetric.reporters | 
| static String | METRICS_0100_TO_24Config value for parameter  "built.in.metrics.version"for built-in metrics from version 0.10.0. | 
| static String | METRICS_LATESTConfig value for parameter  "built.in.metrics.version"for the latest built-in metrics version. | 
| static String | METRICS_NUM_SAMPLES_CONFIGmetrics.num.samples | 
| static String | METRICS_RECORDING_LEVEL_CONFIGmetrics.record.level | 
| static String | METRICS_SAMPLE_WINDOW_MS_CONFIGmetrics.sample.window.ms | 
| static String | NO_OPTIMIZATIONConfig value for parameter  "topology.optimization"for disabling topology optimization | 
| static String | NUM_STANDBY_REPLICAS_CONFIGnum.standby.replicas | 
| static String | NUM_STREAM_THREADS_CONFIGnum.stream.threads | 
| static String | OPTIMIZEConfig value for parameter  "topology.optimization"for enabling topology optimization | 
| static String | PARTITION_GROUPER_CLASS_CONFIGDeprecated.  | 
| static String | POLL_MS_CONFIGpoll.ms | 
| static String | PROBING_REBALANCE_INTERVAL_MS_CONFIGprobing.rebalance.interval.ms | 
| static String | PROCESSING_GUARANTEE_CONFIGprocessing.guarantee | 
| static String | PRODUCER_PREFIXPrefix used to isolate  producerconfigs from other client configs. | 
| static String | RECEIVE_BUFFER_CONFIGreceive.buffer.bytes | 
| static String | RECONNECT_BACKOFF_MAX_MS_CONFIGreconnect.backoff.max | 
| static String | RECONNECT_BACKOFF_MS_CONFIGreconnect.backoff.ms | 
| static String | REPLICATION_FACTOR_CONFIGreplication.factor | 
| static String | REQUEST_TIMEOUT_MS_CONFIGrequest.timeout.ms | 
| static String | RESTORE_CONSUMER_PREFIXPrefix used to override  consumerconfigs for the restore consumer client from
 the general consumer client configs. | 
| static String | RETRIES_CONFIGretries | 
| static String | RETRY_BACKOFF_MS_CONFIGretry.backoff.ms | 
| static String | ROCKSDB_CONFIG_SETTER_CLASS_CONFIGrocksdb.config.setter | 
| static String | SECURITY_PROTOCOL_CONFIGsecurity.protocol | 
| static String | SEND_BUFFER_CONFIGsend.buffer.bytes | 
| static String | STATE_CLEANUP_DELAY_MS_CONFIGstate.cleanup.delay | 
| static String | STATE_DIR_CONFIGstate.dir | 
| static String | TOPIC_PREFIXPrefix used to provide default topic configs to be applied when creating internal topics. | 
| static String | TOPOLOGY_OPTIMIZATIONtopology.optimization | 
| static String | UPGRADE_FROM_0100Config value for parameter  "upgrade.from"for upgrading an application from version0.10.0.x. | 
| static String | UPGRADE_FROM_0101Config value for parameter  "upgrade.from"for upgrading an application from version0.10.1.x. | 
| static String | UPGRADE_FROM_0102Config value for parameter  "upgrade.from"for upgrading an application from version0.10.2.x. | 
| static String | UPGRADE_FROM_0110Config value for parameter  "upgrade.from"for upgrading an application from version0.11.0.x. | 
| static String | UPGRADE_FROM_10Config value for parameter  "upgrade.from"for upgrading an application from version1.0.x. | 
| static String | UPGRADE_FROM_11Config value for parameter  "upgrade.from"for upgrading an application from version1.1.x. | 
| static String | UPGRADE_FROM_20Config value for parameter  "upgrade.from"for upgrading an application from version2.0.x. | 
| static String | UPGRADE_FROM_21Config value for parameter  "upgrade.from"for upgrading an application from version2.1.x. | 
| static String | UPGRADE_FROM_22Config value for parameter  "upgrade.from"for upgrading an application from version2.2.x. | 
| static String | UPGRADE_FROM_23Config value for parameter  "upgrade.from"for upgrading an application from version2.3.x. | 
| static String | UPGRADE_FROM_CONFIGupgrade.from | 
| static String | WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIGwindowstore.changelog.additional.retention.ms | 
CONFIG_PROVIDERS_CONFIG| Modifier | Constructor and Description | 
|---|---|
|   | StreamsConfig(Map<?,?> props)Create a new  StreamsConfigusing the given properties. | 
| protected  | StreamsConfig(Map<?,?> props,
             boolean doLog) | 
documentationOf, equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverridepublic static final int DUMMY_THREAD_INDEX
public static final String TOPIC_PREFIX
TopicConfig.
 It is recommended to use topicPrefix(String).public static final String CONSUMER_PREFIX
consumer configs from other client configs.
 It is recommended to use consumerPrefix(String) to add this prefix to consumer
 properties.public static final String MAIN_CONSUMER_PREFIX
consumer configs for the main consumer client from
 the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
 1. main.consumer.[config-name]
 2. consumer.[config-name]
 3. [config-name]public static final String RESTORE_CONSUMER_PREFIX
consumer configs for the restore consumer client from
 the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
 1. restore.consumer.[config-name]
 2. consumer.[config-name]
 3. [config-name]public static final String GLOBAL_CONSUMER_PREFIX
consumer configs for the global consumer client from
 the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
 1. global.consumer.[config-name]
 2. consumer.[config-name]
 3. [config-name]public static final String PRODUCER_PREFIX
producer configs from other client configs.
 It is recommended to use producerPrefix(String) to add this prefix to producer
 properties.public static final String ADMIN_CLIENT_PREFIX
admin configs from other client configs.
 It is recommended to use adminClientPrefix(String) to add this prefix to admin
 client properties.public static final String NO_OPTIMIZATION
"topology.optimization" for disabling topology optimizationpublic static final String OPTIMIZE
"topology.optimization" for enabling topology optimizationpublic static final String UPGRADE_FROM_0100
"upgrade.from" for upgrading an application from version 0.10.0.x.public static final String UPGRADE_FROM_0101
"upgrade.from" for upgrading an application from version 0.10.1.x.public static final String UPGRADE_FROM_0102
"upgrade.from" for upgrading an application from version 0.10.2.x.public static final String UPGRADE_FROM_0110
"upgrade.from" for upgrading an application from version 0.11.0.x.public static final String UPGRADE_FROM_10
"upgrade.from" for upgrading an application from version 1.0.x.public static final String UPGRADE_FROM_11
"upgrade.from" for upgrading an application from version 1.1.x.public static final String UPGRADE_FROM_20
"upgrade.from" for upgrading an application from version 2.0.x.public static final String UPGRADE_FROM_21
"upgrade.from" for upgrading an application from version 2.1.x.public static final String UPGRADE_FROM_22
"upgrade.from" for upgrading an application from version 2.2.x.public static final String UPGRADE_FROM_23
"upgrade.from" for upgrading an application from version 2.3.x.public static final String AT_LEAST_ONCE
"processing.guarantee" for at-least-once processing guarantees.public static final String EXACTLY_ONCE
"processing.guarantee" for exactly-once processing guarantees.
 
 Enabling exactly-once processing semantics requires broker version 0.11.0 or higher.
 If you enable this feature Kafka Streams will use more resources (like broker connections)
 compared to the AT_LEAST_ONCE case.
EXACTLY_ONCE_BETA, 
Constant Field Valuespublic static final String EXACTLY_ONCE_BETA
"processing.guarantee" for exactly-once processing guarantees.
 
 Enabling exactly-once (beta) requires broker version 2.5 or higher.
 If you enable this feature Kafka Streams will use less resources (like broker connections)
 compare to the EXACTLY_ONCE case.
public static final String METRICS_0100_TO_24
"built.in.metrics.version" for built-in metrics from version 0.10.0. to 2.4public static final String METRICS_LATEST
"built.in.metrics.version" for the latest built-in metrics version.public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lagpublic static final String APPLICATION_ID_CONFIG
application.idpublic static final String APPLICATION_SERVER_CONFIG
application.serverpublic static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.serverspublic static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partitionpublic static final String BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.versionpublic static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.bufferingpublic static final String CLIENT_ID_CONFIG
client.idpublic static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.mspublic static final String MAX_TASK_IDLE_MS_CONFIG
max.task.idle.mspublic static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.mspublic static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handlerpublic static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handlerpublic static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
default.windowed.key.serde.innerpublic static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
default.windowed.value.serde.innerpublic static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serdepublic static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serdepublic static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractorpublic static final String MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicaspublic static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.mspublic static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samplespublic static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.levelpublic static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporterspublic static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.mspublic static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicaspublic static final String NUM_STREAM_THREADS_CONFIG
num.stream.threadspublic static final String POLL_MS_CONFIG
poll.mspublic static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.mspublic static final String PROCESSING_GUARANTEE_CONFIG
processing.guaranteepublic static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytespublic static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.mspublic static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.maxpublic static final String REPLICATION_FACTOR_CONFIG
replication.factorpublic static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.mspublic static final String RETRIES_CONFIG
retriespublic static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.mspublic static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setterpublic static final String SECURITY_PROTOCOL_CONFIG
security.protocolpublic static final String SEND_BUFFER_CONFIG
send.buffer.bytespublic static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delaypublic static final String STATE_DIR_CONFIG
state.dirpublic static final String TOPOLOGY_OPTIMIZATION
topology.optimizationpublic static final String UPGRADE_FROM_CONFIG
upgrade.frompublic static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms@Deprecated public static final String PARTITION_GROUPER_CLASS_CONFIG
partition.grouperpublic StreamsConfig(Map<?,?> props)
StreamsConfig using the given properties.props - properties that specify Kafka Streams and internal consumer/producer configurationprotected StreamsConfig(Map<?,?> props, boolean doLog)
public static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX. This is used to isolate consumer configs
 from other client configs.consumerProp - the consumer property to be maskedCONSUMER_PREFIX + consumerProppublic static String mainConsumerPrefix(String consumerProp)
MAIN_CONSUMER_PREFIX. This is used to isolate main consumer configs
 from other client configs.consumerProp - the consumer property to be maskedMAIN_CONSUMER_PREFIX + consumerProppublic static String restoreConsumerPrefix(String consumerProp)
RESTORE_CONSUMER_PREFIX. This is used to isolate restore consumer configs
 from other client configs.consumerProp - the consumer property to be maskedRESTORE_CONSUMER_PREFIX + consumerProppublic static String globalConsumerPrefix(String consumerProp)
GLOBAL_CONSUMER_PREFIX. This is used to isolate global consumer configs
 from other client configs.consumerProp - the consumer property to be maskedGLOBAL_CONSUMER_PREFIX + consumerProppublic static String producerPrefix(String producerProp)
PRODUCER_PREFIX. This is used to isolate producer configs
 from other client configs.producerProp - the producer property to be maskedproducerProppublic static String adminClientPrefix(String adminClientProp)
ADMIN_CLIENT_PREFIX. This is used to isolate admin configs
 from other client configs.adminClientProp - the admin client property to be maskedadminClientProppublic static String topicPrefix(String topicProp)
TOPIC_PREFIX
 used to provide default topic configs to be applied when creating internal topics.topicProp - the topic property to be maskedtopicProppublic static ConfigDef configDef()
protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
AbstractConfigpostProcessParsedConfig in class AbstractConfigparsedValues - unmodifiable map of current configuration@Deprecated public Map<String,Object> getConsumerConfigs(String groupId, String clientId)
getMainConsumerConfigs(String, String, int)consumer.
 Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
 except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
 version as we only support reading/writing from/to the same Kafka Cluster.groupId - consumer groupIdclientId - clientIdpublic Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
main consumer.
 Properties using the prefix MAIN_CONSUMER_PREFIX will be used in favor over
 the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
 (read the override precedence ordering in MAIN_CONSUMER_PREFIX
 except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
 version as we only support reading/writing from/to the same Kafka Cluster.
 If not specified by MAIN_CONSUMER_PREFIX, main consumer will share the general consumer configs
 prefixed by CONSUMER_PREFIX.groupId - consumer groupIdclientId - clientIdthreadIdx - stream thread indexpublic Map<String,Object> getRestoreConsumerConfigs(String clientId)
restore-consumer.
 Properties using the prefix RESTORE_CONSUMER_PREFIX will be used in favor over
 the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
 (read the override precedence ordering in RESTORE_CONSUMER_PREFIX
 except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
 version as we only support reading/writing from/to the same Kafka Cluster.
 If not specified by RESTORE_CONSUMER_PREFIX, restore consumer will share the general consumer configs
 prefixed by CONSUMER_PREFIX.clientId - clientIdpublic Map<String,Object> getGlobalConsumerConfigs(String clientId)
global consumer.
 Properties using the prefix GLOBAL_CONSUMER_PREFIX will be used in favor over
 the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
 (read the override precedence ordering in GLOBAL_CONSUMER_PREFIX
 except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
 version as we only support reading/writing from/to the same Kafka Cluster.
 If not specified by GLOBAL_CONSUMER_PREFIX, global consumer will share the general consumer configs
 prefixed by CONSUMER_PREFIX.clientId - clientIdpublic Map<String,Object> getProducerConfigs(String clientId)
producer.
 Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions
 except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
 version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdpublic Map<String,Object> getAdminConfigs(String clientId)
admin client.clientId - clientIdpublic Serde defaultKeySerde()
configured instance of key Serde
 class.public Serde defaultValueSerde()
configured instance of value
 Serde class.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(String[] args)