public class StreamsConfig extends AbstractConfig
KafkaStreams instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also
increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
max.poll.interval.ms > max.block.ms
Kafka Streams requires at least the following properties to be set:
By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"enable.auto.commit" (false) - Streams client will always disable/turn off auto committing"processing.guarantee" is set to "exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level" (read_committed) - Consumers will always read committed data only"enable.idempotence" (true) - Producer will always have idempotency enabled"max.in.flight.requests.per.connection" (5) - Producer will always have one in-flight request per connection| Modifier and Type | Class and Description |
|---|---|
static class |
StreamsConfig.InternalConfig |
| Modifier and Type | Field and Description |
|---|---|
static String |
ADMIN_CLIENT_PREFIX
Prefix used to isolate
admin configs from other client configs. |
static String |
APPLICATION_ID_CONFIG
application.id |
static String |
APPLICATION_SERVER_CONFIG
user.endpoint |
static String |
AT_LEAST_ONCE
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static String |
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers |
static String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition |
static String |
CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering |
static String |
CLIENT_ID_CONFIG
client.id |
static String |
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms |
static String |
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms |
static String |
CONSUMER_PREFIX
Prefix used to isolate
consumer configs from other client configs. |
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler |
static String |
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde |
static String |
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor |
static String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde |
static String |
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
default.windowed.key.serde.inner |
static String |
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
default.windowed.value.serde.inner |
static int |
DUMMY_THREAD_INDEX |
static String |
EXACTLY_ONCE
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static String |
GLOBAL_CONSUMER_PREFIX
Prefix used to override
consumer configs for the global consumer client from
the general consumer client configs. |
static String |
MAIN_CONSUMER_PREFIX
Prefix used to override
consumer configs for the main consumer client from
the general consumer client configs. |
static String |
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms |
static String |
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms |
static String |
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters |
static String |
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples |
static String |
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level |
static String |
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms |
static String |
NO_OPTIMIZATION
Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for disabling topology optimization
|
static String |
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas |
static String |
NUM_STREAM_THREADS_CONFIG
num.stream.threads |
static String |
OPTIMIZE
Config value for parameter (@link #TOPOLOGY_OPTIMIZATION "topology.optimization" for enabling topology optimization
|
static String |
PARTITION_GROUPER_CLASS_CONFIG
Deprecated.
|
static String |
POLL_MS_CONFIG
poll.ms |
static String |
PROCESSING_GUARANTEE_CONFIG
processing.guarantee |
static String |
PRODUCER_PREFIX
Prefix used to isolate
producer configs from other client configs. |
static String |
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes |
static String |
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max |
static String |
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms |
static String |
REPLICATION_FACTOR_CONFIG
replication.factor |
static String |
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms |
static String |
RESTORE_CONSUMER_PREFIX
Prefix used to override
consumer configs for the restore consumer client from
the general consumer client configs. |
static String |
RETRIES_CONFIG
retries |
static String |
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms |
static String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter |
static String |
SECURITY_PROTOCOL_CONFIG
security.protocol |
static String |
SEND_BUFFER_CONFIG
send.buffer.bytes |
static String |
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay |
static String |
STATE_DIR_CONFIG
state.dir |
static String |
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static String |
TOPOLOGY_OPTIMIZATION
topology.optimization |
static String |
UPGRADE_FROM_0100
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.0.x. |
static String |
UPGRADE_FROM_0101
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.1.x. |
static String |
UPGRADE_FROM_0102
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.2.x. |
static String |
UPGRADE_FROM_0110
Config value for parameter
"upgrade.from" for upgrading an application from version 0.11.0.x. |
static String |
UPGRADE_FROM_10
Config value for parameter
"upgrade.from" for upgrading an application from version 1.0.x. |
static String |
UPGRADE_FROM_11
Config value for parameter
"upgrade.from" for upgrading an application from version 1.1.x. |
static String |
UPGRADE_FROM_20
Config value for parameter
"upgrade.from" for upgrading an application from version 2.0.x. |
static String |
UPGRADE_FROM_21
Config value for parameter
"upgrade.from" for upgrading an application from version 2.1.x. |
static String |
UPGRADE_FROM_22
Config value for parameter
"upgrade.from" for upgrading an application from version 2.2.x. |
static String |
UPGRADE_FROM_23
Config value for parameter
"upgrade.from" for upgrading an application from version 2.3.x. |
static String |
UPGRADE_FROM_CONFIG
upgrade.from |
static String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms |
CONFIG_PROVIDERS_CONFIG| Modifier | Constructor and Description |
|---|---|
|
StreamsConfig(Map<?,?> props)
Create a new
StreamsConfig using the given properties. |
protected |
StreamsConfig(Map<?,?> props,
boolean doLog) |
equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverridepublic static final int DUMMY_THREAD_INDEX
public static final String TOPIC_PREFIX
TopicConfig.
It is recommended to use topicPrefix(String).public static final String CONSUMER_PREFIX
consumer configs from other client configs.
It is recommended to use consumerPrefix(String) to add this prefix to consumer
properties.public static final String MAIN_CONSUMER_PREFIX
consumer configs for the main consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. main.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String RESTORE_CONSUMER_PREFIX
consumer configs for the restore consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. restore.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String GLOBAL_CONSUMER_PREFIX
consumer configs for the global consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. global.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String PRODUCER_PREFIX
producer configs from other client configs.
It is recommended to use producerPrefix(String) to add this prefix to producer
properties.public static final String ADMIN_CLIENT_PREFIX
admin configs from other client configs.
It is recommended to use adminClientPrefix(String) to add this prefix to producer
properties.public static final String NO_OPTIMIZATION
public static final String OPTIMIZE
public static final String UPGRADE_FROM_0100
"upgrade.from" for upgrading an application from version 0.10.0.x.public static final String UPGRADE_FROM_0101
"upgrade.from" for upgrading an application from version 0.10.1.x.public static final String UPGRADE_FROM_0102
"upgrade.from" for upgrading an application from version 0.10.2.x.public static final String UPGRADE_FROM_0110
"upgrade.from" for upgrading an application from version 0.11.0.x.public static final String UPGRADE_FROM_10
"upgrade.from" for upgrading an application from version 1.0.x.public static final String UPGRADE_FROM_11
"upgrade.from" for upgrading an application from version 1.1.x.public static final String UPGRADE_FROM_20
"upgrade.from" for upgrading an application from version 2.0.x.public static final String UPGRADE_FROM_21
"upgrade.from" for upgrading an application from version 2.1.x.public static final String UPGRADE_FROM_22
"upgrade.from" for upgrading an application from version 2.2.x.public static final String UPGRADE_FROM_23
"upgrade.from" for upgrading an application from version 2.3.x.public static final String AT_LEAST_ONCE
"processing.guarantee" for at-least-once processing guarantees.public static final String EXACTLY_ONCE
"processing.guarantee" for exactly-once processing guarantees.public static final String APPLICATION_ID_CONFIG
application.idpublic static final String APPLICATION_SERVER_CONFIG
user.endpointpublic static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.serverspublic static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partitionpublic static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.bufferingpublic static final String CLIENT_ID_CONFIG
client.idpublic static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.mspublic static final String MAX_TASK_IDLE_MS_CONFIG
max.task.idle.mspublic static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.mspublic static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handlerpublic static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handlerpublic static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
default.windowed.key.serde.innerpublic static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
default.windowed.value.serde.innerpublic static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serdepublic static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serdepublic static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractorpublic static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.mspublic static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samplespublic static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.levelpublic static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporterspublic static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.mspublic static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicaspublic static final String NUM_STREAM_THREADS_CONFIG
num.stream.threadspublic static final String POLL_MS_CONFIG
poll.mspublic static final String PROCESSING_GUARANTEE_CONFIG
processing.guaranteepublic static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytespublic static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.mspublic static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.maxpublic static final String REPLICATION_FACTOR_CONFIG
replication.factorpublic static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.mspublic static final String RETRIES_CONFIG
retriespublic static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.mspublic static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setterpublic static final String SECURITY_PROTOCOL_CONFIG
security.protocolpublic static final String SEND_BUFFER_CONFIG
send.buffer.bytespublic static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delaypublic static final String STATE_DIR_CONFIG
state.dirpublic static final String TOPOLOGY_OPTIMIZATION
topology.optimizationpublic static final String UPGRADE_FROM_CONFIG
upgrade.frompublic static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms@Deprecated public static final String PARTITION_GROUPER_CLASS_CONFIG
partition.grouperpublic StreamsConfig(Map<?,?> props)
StreamsConfig using the given properties.props - properties that specify Kafka Streams and internal consumer/producer configurationprotected StreamsConfig(Map<?,?> props, boolean doLog)
public static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX. This is used to isolate consumer configs
from other client configs.consumerProp - the consumer property to be maskedCONSUMER_PREFIX + consumerProppublic static String mainConsumerPrefix(String consumerProp)
MAIN_CONSUMER_PREFIX. This is used to isolate main consumer configs
from other client configs.consumerProp - the consumer property to be maskedMAIN_CONSUMER_PREFIX + consumerProppublic static String restoreConsumerPrefix(String consumerProp)
RESTORE_CONSUMER_PREFIX. This is used to isolate restore consumer configs
from other client configs.consumerProp - the consumer property to be maskedRESTORE_CONSUMER_PREFIX + consumerProppublic static String globalConsumerPrefix(String consumerProp)
GLOBAL_CONSUMER_PREFIX. This is used to isolate global consumer configs
from other client configs.consumerProp - the consumer property to be maskedGLOBAL_CONSUMER_PREFIX + consumerProppublic static String producerPrefix(String producerProp)
PRODUCER_PREFIX. This is used to isolate producer configs
from other client configs.producerProp - the producer property to be maskedproducerProppublic static String adminClientPrefix(String adminClientProp)
ADMIN_CLIENT_PREFIX. This is used to isolate admin configs
from other client configs.adminClientProp - the admin client property to be maskedadminClientProppublic static String topicPrefix(String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp - the topic property to be maskedtopicProppublic static ConfigDef configDef()
protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
AbstractConfigpostProcessParsedConfig in class AbstractConfigparsedValues - unmodifiable map of current configuration@Deprecated public Map<String,Object> getConsumerConfigs(String groupId, String clientId)
getMainConsumerConfigs(String, String, int)consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.groupId - consumer groupIdclientId - clientIdpublic Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
main consumer.
Properties using the prefix MAIN_CONSUMER_PREFIX will be used in favor over
the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
(read the override precedence ordering in MAIN_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by MAIN_CONSUMER_PREFIX, main consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX.groupId - consumer groupIdclientId - clientIdthreadIdx - stream thread indexpublic Map<String,Object> getRestoreConsumerConfigs(String clientId)
restore-consumer.
Properties using the prefix RESTORE_CONSUMER_PREFIX will be used in favor over
the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
(read the override precedence ordering in RESTORE_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by RESTORE_CONSUMER_PREFIX, restore consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX.clientId - clientIdpublic Map<String,Object> getGlobalConsumerConfigs(String clientId)
global consumer.
Properties using the prefix GLOBAL_CONSUMER_PREFIX will be used in favor over
the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions
(read the override precedence ordering in GLOBAL_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by GLOBAL_CONSUMER_PREFIX, global consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX.clientId - clientIdpublic Map<String,Object> getProducerConfigs(String clientId)
producer.
Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdpublic Map<String,Object> getAdminConfigs(String clientId)
admin client.clientId - clientIdpublic Serde defaultKeySerde()
configured instance of key Serde
class.public Serde defaultValueSerde()
configured instance of value
Serde class.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(String[] args)