Package org.apache.kafka.streams
Class StreamsConfig
java.lang.Object
org.apache.kafka.common.config.AbstractConfig
org.apache.kafka.streams.StreamsConfig
Configuration for a
KafkaStreams
instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer
, KafkaProducer
and Admin
.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String)
, producerPrefix(String)
and adminClientPrefix(String)
, respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG
to be more resilient to non-available brokers you should also
increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
using the following guidance:
max.poll.interval.ms > max.block.msKafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"group.id"
(<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit"
(false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy"
(StreamsPartitionAssignor
) - Streams client will always use its own partition assignor
"processing.guarantee"
is set to "exactly_once_v2"
,
Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level"
(read_committed) - Consumers will always read committed data only"enable.idempotence"
(true) - Producer will always have idempotency enabled
- See Also:
-
Nested Class Summary
Nested Classes -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final String
acceptable.recovery.lag
static final String
Prefix used to isolateadmin
configs from other client configs.static final String
application.id
static final String
application.server
static final String
Config value for parameter"processing.guarantee"
for at-least-once processing guarantees.static final String
bootstrap.servers
static final String
buffered.records.per.partition
static final String
Deprecated.static final String
built.in.metrics.version
static final String
Deprecated.Since 3.4.static final String
Deprecated.static final String
client.id
static final String
Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.static final String
commit.interval.ms
static final String
connections.max.idle.ms
static final String
Prefix used to isolateconsumer
configs from other client configs.static final String
default.client.supplier
static final String
Deprecated.static final String
Deprecated.Since 4.0.static final String
Deprecated.static final String
Deprecated.static final String
Deprecated.Since 3.7.static final String
Deprecated.static final String
default key.serde
static final String
Deprecated.Since 4.0.static final String
default.timestamp.extractor
static final String
Deprecated.static final String
default.value.serde
static final String
deserialization.exception.handler
static final String
dsl.store.suppliers.class
static final int
Deprecated.static final String
enable.metrics.push
static final String
Deprecated.static final String
Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.static final String
Prefix used to overrideconsumer
configs for the global consumer client from the general consumer client configs.static final String
Deprecated.static final String
log.summary.interval.ms
static final String
Prefix used to overrideconsumer
configs for the main consumer client from the general consumer client configs.static final int
static final int
static final int
static final String
max.task.idle.ms
static final long
static final String
Deprecated.static final String
max.warmup.replicas
static final String
Config value for parameter"topology.optimization"
for enabling the specific optimization that merges duplicated repartition topics.static final String
metadata.max.age.ms
static final String
metric.reporters
static final String
Config value for parameter"built.in.metrics.version"
for the latest built-in metrics version.static final String
metrics.num.samples
static final String
metrics.record.level
static final String
metrics.sample.window.ms
static final String
Config value for parameter"topology.optimization"
for disabling topology optimizationstatic final String
num.standby.replicas
static final String
num.stream.threads
static final String
Config value for parameter"topology.optimization"
for enabling topology optimizationstatic final String
poll.ms
static final String
probing.rebalance.interval.ms
static final String
processing.exception.handler
static final String
Deprecated.static final String
processing.guarantee
static final String
processor.wrapper.class
static final String
Prefix used to isolateproducer
configs from other client configs.static final String
production.exception.handler
static final String
rack.aware.assignment.non_overlap_cost
static final String
Deprecated.static final String
static final String
rack.aware.assignment.strategy
static final String
Deprecated.static final String
static final String
static final String
rack.aware.assignment.tags
static final String
rack.aware.assignment.traffic_cost
static final String
Deprecated.static final String
receive.buffer.bytes
static final String
reconnect.backoff.max
static final String
reconnect.backoff.ms
static final String
repartition.purge.interval.ms
static final String
replication.factor
static final String
request.timeout.ms
static final String
Prefix used to overrideconsumer
configs for the restore consumer client from the general consumer client configs.static final String
retry.backoff.ms
static final String
Config value for parameter"topology.optimization"
for enabling the specific optimization that reuses source topic as changelog topic for KTables.static final String
Deprecated.static final String
rocksdb.config.setter
static final String
security.protocol
static final String
send.buffer.bytes
static final String
Config value for parameter"topology.optimization"
for enabling the optimization that optimizes inner stream-stream joins into self-joins when both arguments are the same stream.static final String
state.cleanup.delay
static final String
state.dir
static final String
statestore.cache.max.bytes
static final String
Deprecated.static final String
task.assignor.class
static final String
task.timeout.ms
static final String
Deprecated.static final String
Prefix used to provide default topic configs to be applied when creating internal topics.static final String
topology.optimization
static final String
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.0.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.1.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.2.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version0.11.0.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version1.0.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version1.1.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.0.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.1.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.2.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.3.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.4.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.5.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.6.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.7.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version2.8.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.0.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.1.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.2.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.3.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.4.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.5.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.6.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.7.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.8.x
.static final String
Config value for parameter"upgrade.from"
for upgrading an application from version3.9.x
.static final String
upgrade.from
static final String
window.size.ms
static final String
windowstore.changelog.additional.retention.ms
static final String
windowed.inner.class.serde
Fields inherited from class org.apache.kafka.common.config.AbstractConfig
AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, CONFIG_PROVIDERS_CONFIG
-
Constructor Summary
ConstructorsConstructorDescriptionStreamsConfig
(Map<?, ?> props) Create a newStreamsConfig
using the given properties. -
Method Summary
Modifier and TypeMethodDescriptionstatic String
adminClientPrefix
(String adminClientProp) Prefix a property withADMIN_CLIENT_PREFIX
.static String
clientTagPrefix
(String clientTagKey) Prefix a client tag key withCLIENT_TAG_PREFIX
.static ConfigDef
Return a copy of the config definition.static String
consumerPrefix
(String consumerProp) Prefix a property withCONSUMER_PREFIX
.Deprecated.Since 4.0.Serde
<?> Return anconfigured
instance ofkey Serde class
.Deprecated.Since 4.0.Serde
<?> Return anconfigured
instance ofvalue Serde class
.getAdminConfigs
(String clientId) Get the configs for theadmin client
.Get the configured client tags set withCLIENT_TAG_PREFIX
prefix.getGlobalConsumerConfigs
(String clientId) Get the configs for theglobal consumer
.Return configured KafkaClientSuppliergetMainConsumerConfigs
(String groupId, String clientId, int threadIdx) Get the configs to themain consumer
.getProducerConfigs
(String clientId) Get the configs for theproducer
.getRestoreConsumerConfigs
(String clientId) Get the configs for therestore-consumer
.static String
globalConsumerPrefix
(String consumerProp) Prefix a property withGLOBAL_CONSUMER_PREFIX
.static void
static String
mainConsumerPrefix
(String consumerProp) Prefix a property withMAIN_CONSUMER_PREFIX
.static String
producerPrefix
(String producerProp) Prefix a property withPRODUCER_PREFIX
.static String
restoreConsumerPrefix
(String consumerProp) Prefix a property withRESTORE_CONSUMER_PREFIX
.static String
topicPrefix
(String topicProp) Prefix a property withTOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.Methods inherited from class org.apache.kafka.common.config.AbstractConfig
documentationOf, equals, getBoolean, getClass, getConfiguredInstance, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, nonInternalValues, originals, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
-
Field Details
-
DUMMY_THREAD_INDEX
Deprecated.- See Also:
-
MAX_TASK_IDLE_MS_DISABLED
public static final long MAX_TASK_IDLE_MS_DISABLED- See Also:
-
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE- See Also:
-
MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH- See Also:
-
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH- See Also:
-
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics. These should be valid properties fromTopicConfig
. It is recommended to usetopicPrefix(String)
.- See Also:
-
CONSUMER_PREFIX
Prefix used to isolateconsumer
configs from other client configs. It is recommended to useconsumerPrefix(String)
to add this prefix toconsumer properties
.- See Also:
-
MAIN_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the main consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. main.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
-
RESTORE_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the restore consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. restore.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
-
GLOBAL_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the global consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. global.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
-
PRODUCER_PREFIX
Prefix used to isolateproducer
configs from other client configs. It is recommended to useproducerPrefix(String)
to add this prefix toproducer properties
.- See Also:
-
ADMIN_CLIENT_PREFIX
Prefix used to isolateadmin
configs from other client configs. It is recommended to useadminClientPrefix(String)
to add this prefix toadmin client properties
.- See Also:
-
CLIENT_TAG_PREFIX
Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs. Example: client.tag.zone=zone1 client.tag.cluster=cluster1- See Also:
-
NO_OPTIMIZATION
Config value for parameter"topology.optimization"
for disabling topology optimization- See Also:
-
OPTIMIZE
Config value for parameter"topology.optimization"
for enabling topology optimization- See Also:
-
REUSE_KTABLE_SOURCE_TOPICS
Config value for parameter"topology.optimization"
for enabling the specific optimization that reuses source topic as changelog topic for KTables.- See Also:
-
MERGE_REPARTITION_TOPICS
Config value for parameter"topology.optimization"
for enabling the specific optimization that merges duplicated repartition topics.- See Also:
-
SINGLE_STORE_SELF_JOIN
Config value for parameter"topology.optimization"
for enabling the optimization that optimizes inner stream-stream joins into self-joins when both arguments are the same stream.- See Also:
-
UPGRADE_FROM_0100
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.0.x
. -
UPGRADE_FROM_0101
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.1.x
. -
UPGRADE_FROM_0102
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.2.x
. -
UPGRADE_FROM_0110
Config value for parameter"upgrade.from"
for upgrading an application from version0.11.0.x
. -
UPGRADE_FROM_10
Config value for parameter"upgrade.from"
for upgrading an application from version1.0.x
. -
UPGRADE_FROM_11
Config value for parameter"upgrade.from"
for upgrading an application from version1.1.x
. -
UPGRADE_FROM_20
Config value for parameter"upgrade.from"
for upgrading an application from version2.0.x
. -
UPGRADE_FROM_21
Config value for parameter"upgrade.from"
for upgrading an application from version2.1.x
. -
UPGRADE_FROM_22
Config value for parameter"upgrade.from"
for upgrading an application from version2.2.x
. -
UPGRADE_FROM_23
Config value for parameter"upgrade.from"
for upgrading an application from version2.3.x
. -
UPGRADE_FROM_24
Config value for parameter"upgrade.from"
for upgrading an application from version2.4.x
. -
UPGRADE_FROM_25
Config value for parameter"upgrade.from"
for upgrading an application from version2.5.x
. -
UPGRADE_FROM_26
Config value for parameter"upgrade.from"
for upgrading an application from version2.6.x
. -
UPGRADE_FROM_27
Config value for parameter"upgrade.from"
for upgrading an application from version2.7.x
. -
UPGRADE_FROM_28
Config value for parameter"upgrade.from"
for upgrading an application from version2.8.x
. -
UPGRADE_FROM_30
Config value for parameter"upgrade.from"
for upgrading an application from version3.0.x
. -
UPGRADE_FROM_31
Config value for parameter"upgrade.from"
for upgrading an application from version3.1.x
. -
UPGRADE_FROM_32
Config value for parameter"upgrade.from"
for upgrading an application from version3.2.x
. -
UPGRADE_FROM_33
Config value for parameter"upgrade.from"
for upgrading an application from version3.3.x
. -
UPGRADE_FROM_34
Config value for parameter"upgrade.from"
for upgrading an application from version3.4.x
. -
UPGRADE_FROM_35
Config value for parameter"upgrade.from"
for upgrading an application from version3.5.x
. -
UPGRADE_FROM_36
Config value for parameter"upgrade.from"
for upgrading an application from version3.6.x
. -
UPGRADE_FROM_37
Config value for parameter"upgrade.from"
for upgrading an application from version3.7.x
. -
UPGRADE_FROM_38
Config value for parameter"upgrade.from"
for upgrading an application from version3.8.x
. -
UPGRADE_FROM_39
Config value for parameter"upgrade.from"
for upgrading an application from version3.9.x
. -
AT_LEAST_ONCE
Config value for parameter"processing.guarantee"
for at-least-once processing guarantees.- See Also:
-
EXACTLY_ONCE_V2
Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.Enabling exactly-once-v2 requires broker version 2.5 or higher.
- See Also:
-
RACK_AWARE_ASSIGNMENT_STRATEGY_NONE
- See Also:
-
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
- See Also:
-
RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
- See Also:
-
METRICS_LATEST
Config value for parameter"built.in.metrics.version"
for the latest built-in metrics version.- See Also:
-
ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag
- See Also:
-
APPLICATION_ID_CONFIG
application.id
- See Also:
-
APPLICATION_SERVER_CONFIG
application.server
- See Also:
-
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
- See Also:
-
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
- See Also:
-
BUFFERED_RECORDS_PER_PARTITION_DOC
Deprecated.- See Also:
-
BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version
- See Also:
-
CACHE_MAX_BYTES_BUFFERING_CONFIG
Deprecated.Since 3.4. Use"statestore.cache.max.bytes"
instead.cache.max.bytes.buffering
- See Also:
-
CACHE_MAX_BYTES_BUFFERING_DOC
Deprecated.- See Also:
-
CLIENT_ID_CONFIG
client.id
- See Also:
-
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
- See Also:
-
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
- See Also:
-
DEFAULT_CLIENT_SUPPLIER_CONFIG
default.client.supplier
- See Also:
-
DEFAULT_CLIENT_SUPPLIER_DOC
Deprecated.- See Also:
-
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
Deprecated.Since 4.0. UseDESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
instead.default.deserialization.exception.handler
- See Also:
-
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
Deprecated.- See Also:
-
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
deserialization.exception.handler
- See Also:
-
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
Deprecated.Since 4.0. UsePRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
instead.default.production.exception.handler
- See Also:
-
PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
production.exception.handler
- See Also:
-
DEFAULT_DSL_STORE_CONFIG
Deprecated.Since 3.7. UseDSL_STORE_SUPPLIERS_CLASS_CONFIG
instead.default.dsl.store
- See Also:
-
DEFAULT_DSL_STORE_DOC
Deprecated.- See Also:
-
ROCKS_DB
Deprecated.- See Also:
-
IN_MEMORY
Deprecated.- See Also:
-
DEFAULT_DSL_STORE
Deprecated.- See Also:
-
DSL_STORE_SUPPLIERS_CLASS_CONFIG
dsl.store.suppliers.class
- See Also:
-
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
- See Also:
-
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default.value.serde
- See Also:
-
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor
- See Also:
-
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC
Deprecated.- See Also:
-
ENABLE_METRICS_PUSH_CONFIG
enable.metrics.push
- See Also:
-
ENABLE_METRICS_PUSH_DOC
Deprecated.- See Also:
-
LOG_SUMMARY_INTERVAL_MS_CONFIG
log.summary.interval.ms
- See Also:
-
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms
- See Also:
-
MAX_TASK_IDLE_MS_DOC
Deprecated.- See Also:
-
MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas
- See Also:
-
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
- See Also:
-
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
- See Also:
-
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
- See Also:
-
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
- See Also:
-
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
- See Also:
-
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
- See Also:
-
NUM_STREAM_THREADS_CONFIG
num.stream.threads
- See Also:
-
POLL_MS_CONFIG
poll.ms
- See Also:
-
PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms
- See Also:
-
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
processing.exception.handler
- See Also:
-
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC
Deprecated.- See Also:
-
PROCESSING_GUARANTEE_CONFIG
processing.guarantee
- See Also:
-
PROCESSOR_WRAPPER_CLASS_CONFIG
processor.wrapper.class
- See Also:
-
REPARTITION_PURGE_INTERVAL_MS_CONFIG
repartition.purge.interval.ms
- See Also:
-
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
- See Also:
-
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
rack.aware.assignment.non_overlap_cost
- See Also:
-
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC
Deprecated. -
RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG
rack.aware.assignment.strategy
- See Also:
-
RACK_AWARE_ASSIGNMENT_STRATEGY_DOC
Deprecated.- See Also:
-
RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
rack.aware.assignment.tags
- See Also:
-
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG
rack.aware.assignment.traffic_cost
- See Also:
-
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC
Deprecated. -
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
- See Also:
-
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
- See Also:
-
REPLICATION_FACTOR_CONFIG
replication.factor
- See Also:
-
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
- See Also:
-
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
- See Also:
-
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
- See Also:
-
SECURITY_PROTOCOL_CONFIG
security.protocol
- See Also:
-
SEND_BUFFER_CONFIG
send.buffer.bytes
- See Also:
-
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
- See Also:
-
STATE_DIR_CONFIG
state.dir
- See Also:
-
STATESTORE_CACHE_MAX_BYTES_CONFIG
statestore.cache.max.bytes
- See Also:
-
STATESTORE_CACHE_MAX_BYTES_DOC
Deprecated.- See Also:
-
TASK_ASSIGNOR_CLASS_CONFIG
task.assignor.class
- See Also:
-
TASK_TIMEOUT_MS_CONFIG
task.timeout.ms
- See Also:
-
TASK_TIMEOUT_MS_DOC
Deprecated.- See Also:
-
UPGRADE_FROM_CONFIG
upgrade.from
- See Also:
-
TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization
- See Also:
-
WINDOWED_INNER_CLASS_SERDE
windowed.inner.class.serde
- See Also:
-
WINDOW_SIZE_MS_CONFIG
window.size.ms
- See Also:
-
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
- See Also:
-
-
Constructor Details
-
StreamsConfig
Create a newStreamsConfig
using the given properties.- Parameters:
props
- properties that specify Kafka Streams and internal consumer/producer configuration
-
-
Method Details
-
consumerPrefix
Prefix a property withCONSUMER_PREFIX
. This is used to isolateconsumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
CONSUMER_PREFIX
+consumerProp
-
mainConsumerPrefix
Prefix a property withMAIN_CONSUMER_PREFIX
. This is used to isolatemain consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
MAIN_CONSUMER_PREFIX
+consumerProp
-
restoreConsumerPrefix
Prefix a property withRESTORE_CONSUMER_PREFIX
. This is used to isolaterestore consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
RESTORE_CONSUMER_PREFIX
+consumerProp
-
clientTagPrefix
Prefix a client tag key withCLIENT_TAG_PREFIX
.- Parameters:
clientTagKey
- client tag key- Returns:
CLIENT_TAG_PREFIX
+clientTagKey
-
globalConsumerPrefix
Prefix a property withGLOBAL_CONSUMER_PREFIX
. This is used to isolateglobal consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
GLOBAL_CONSUMER_PREFIX
+consumerProp
-
producerPrefix
Prefix a property withPRODUCER_PREFIX
. This is used to isolateproducer configs
from other client configs.- Parameters:
producerProp
- the producer property to be masked- Returns:
- PRODUCER_PREFIX +
producerProp
-
adminClientPrefix
Prefix a property withADMIN_CLIENT_PREFIX
. This is used to isolateadmin configs
from other client configs.- Parameters:
adminClientProp
- the admin client property to be masked- Returns:
- ADMIN_CLIENT_PREFIX +
adminClientProp
-
topicPrefix
Prefix a property withTOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.- Parameters:
topicProp
- the topic property to be masked- Returns:
- TOPIC_PREFIX +
topicProp
-
configDef
Return a copy of the config definition.- Returns:
- a copy of the config definition
-
getMainConsumerConfigs
Get the configs to themain consumer
. Properties using the prefixMAIN_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inMAIN_CONSUMER_PREFIX
) except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byMAIN_CONSUMER_PREFIX
, main consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
groupId
- consumer groupIdclientId
- clientIdthreadIdx
- stream thread index- Returns:
- Map of the consumer configuration.
-
getRestoreConsumerConfigs
Get the configs for therestore-consumer
. Properties using the prefixRESTORE_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inRESTORE_CONSUMER_PREFIX
) except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byRESTORE_CONSUMER_PREFIX
, restore consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
clientId
- clientId- Returns:
- Map of the restore consumer configuration.
-
getGlobalConsumerConfigs
Get the configs for theglobal consumer
. Properties using the prefixGLOBAL_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inGLOBAL_CONSUMER_PREFIX
) except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byGLOBAL_CONSUMER_PREFIX
, global consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
clientId
- clientId- Returns:
- Map of the global consumer configuration.
-
getProducerConfigs
Get the configs for theproducer
. Properties using the prefixPRODUCER_PREFIX
will be used in favor over their non-prefixed versions except in the case ofProducerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.- Parameters:
clientId
- clientId- Returns:
- Map of the producer configuration.
-
getAdminConfigs
Get the configs for theadmin client
.- Parameters:
clientId
- clientId- Returns:
- Map of the admin client configuration.
-
getClientTags
Get the configured client tags set withCLIENT_TAG_PREFIX
prefix.- Returns:
- Map of the client tags.
-
verifyTopologyOptimizationConfigs
-
getKafkaClientSupplier
Return configured KafkaClientSupplier- Returns:
- Configured KafkaClientSupplier
-
defaultKeySerde
Return anconfigured
instance ofkey Serde class
.- Returns:
- a configured instance of key Serde class
-
defaultValueSerde
Return anconfigured
instance ofvalue Serde class
.- Returns:
- an configured instance of value Serde class
-
defaultTimestampExtractor
-
deserializationExceptionHandler
-
defaultDeserializationExceptionHandler
Deprecated.Since 4.0. UsedeserializationExceptionHandler()
instead. -
productionExceptionHandler
-
defaultProductionExceptionHandler
Deprecated.Since 4.0. UseproductionExceptionHandler()
instead. -
processingExceptionHandler
-
main
-