Package org.apache.kafka.streams
Class StreamsConfig
java.lang.Object
org.apache.kafka.common.config.AbstractConfig
org.apache.kafka.streams.StreamsConfig
public class StreamsConfig extends AbstractConfig
Configuration for a
KafkaStreams
instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer
, KafkaProducer
and Admin
.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String)
, producerPrefix(String)
and adminClientPrefix(String)
, respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG
to be more resilient to non-available brokers you should also
increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
using the following guidance:
max.poll.interval.ms > max.block.msKafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"group.id"
(<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit"
(false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy"
(StreamsPartitionAssignor
) - Streams client will always use its own partition assignor
"processing.guarantee"
is set to "exactly_once_v2"
,
"exactly_once"
(deprecated), or "exactly_once_beta"
(deprecated), Kafka Streams does not
allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level"
(read_committed) - Consumers will always read committed data only"enable.idempotence"
(true) - Producer will always have idempotency enabled
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
StreamsConfig.InternalConfig
-
Field Summary
Fields Modifier and Type Field Description static String
ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag
static String
ADMIN_CLIENT_PREFIX
Prefix used to isolateadmin
configs from other client configs.static String
APPLICATION_ID_CONFIG
application.id
static String
APPLICATION_SERVER_CONFIG
application.server
static String
AT_LEAST_ONCE
Config value for parameter"processing.guarantee"
for at-least-once processing guarantees.static String
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
static String
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
static String
BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version
static String
CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering
static String
CLIENT_ID_CONFIG
client.id
static String
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
static String
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
static String
CONSUMER_PREFIX
Prefix used to isolateconsumer
configs from other client configs.static String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler
static String
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
static String
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler
static String
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor
static String
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde
static String
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
Deprecated.static String
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
Deprecated.static int
DUMMY_THREAD_INDEX
static String
EXACTLY_ONCE
Deprecated.Since 3.0.0, will be removed in 4.0.static String
EXACTLY_ONCE_BETA
Deprecated.Since 3.0.0, will be removed in 4.0.static String
EXACTLY_ONCE_V2
Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.static String
GLOBAL_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the global consumer client from the general consumer client configs.static String
MAIN_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the main consumer client from the general consumer client configs.static String
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms
static long
MAX_TASK_IDLE_MS_DISABLED
static String
MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas
static String
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
static String
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
static String
METRICS_LATEST
Config value for parameter"built.in.metrics.version"
for the latest built-in metrics version.static String
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
static String
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
static String
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
static String
NO_OPTIMIZATION
Config value for parameter"topology.optimization"
for disabling topology optimizationstatic String
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
static String
NUM_STREAM_THREADS_CONFIG
num.stream.threads
static String
OPTIMIZE
Config value for parameter"topology.optimization"
for enabling topology optimizationstatic String
POLL_MS_CONFIG
poll.ms
static String
PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms
static String
PROCESSING_GUARANTEE_CONFIG
processing.guarantee
static String
PRODUCER_PREFIX
Prefix used to isolateproducer
configs from other client configs.static String
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
static String
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
static String
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
static String
REPLICATION_FACTOR_CONFIG
replication.factor
static String
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
static String
RESTORE_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the restore consumer client from the general consumer client configs.static String
RETRIES_CONFIG
Deprecated.since 2.7static String
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
static String
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
static String
SECURITY_PROTOCOL_CONFIG
security.protocol
static String
SEND_BUFFER_CONFIG
send.buffer.bytes
static String
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
static String
STATE_DIR_CONFIG
state.dir
static String
TASK_TIMEOUT_MS_CONFIG
task.timeout.ms
static String
TASK_TIMEOUT_MS_DOC
static String
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics.static String
TOPOLOGY_OPTIMIZATION
Deprecated.since 2.7; useTOPOLOGY_OPTIMIZATION_CONFIG
insteadstatic String
TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization
static String
UPGRADE_FROM_0100
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.0.x
.static String
UPGRADE_FROM_0101
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.1.x
.static String
UPGRADE_FROM_0102
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.2.x
.static String
UPGRADE_FROM_0110
Config value for parameter"upgrade.from"
for upgrading an application from version0.11.0.x
.static String
UPGRADE_FROM_10
Config value for parameter"upgrade.from"
for upgrading an application from version1.0.x
.static String
UPGRADE_FROM_11
Config value for parameter"upgrade.from"
for upgrading an application from version1.1.x
.static String
UPGRADE_FROM_20
Config value for parameter"upgrade.from"
for upgrading an application from version2.0.x
.static String
UPGRADE_FROM_21
Config value for parameter"upgrade.from"
for upgrading an application from version2.1.x
.static String
UPGRADE_FROM_22
Config value for parameter"upgrade.from"
for upgrading an application from version2.2.x
.static String
UPGRADE_FROM_23
Config value for parameter"upgrade.from"
for upgrading an application from version2.3.x
.static String
UPGRADE_FROM_CONFIG
upgrade.from
static String
WINDOW_SIZE_MS_CONFIG
window.size.ms
static String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
static String
WINDOWED_INNER_CLASS_SERDE
Fields inherited from class org.apache.kafka.common.config.AbstractConfig
CONFIG_PROVIDERS_CONFIG
-
Constructor Summary
Constructors Modifier Constructor Description StreamsConfig(Map<?,?> props)
Create a newStreamsConfig
using the given properties.protected
StreamsConfig(Map<?,?> props, boolean doLog)
-
Method Summary
Modifier and Type Method Description static String
adminClientPrefix(String adminClientProp)
Prefix a property withADMIN_CLIENT_PREFIX
.static ConfigDef
configDef()
Return a copy of the config definition.static String
consumerPrefix(String consumerProp)
Prefix a property withCONSUMER_PREFIX
.DeserializationExceptionHandler
defaultDeserializationExceptionHandler()
Serde
defaultKeySerde()
Return anconfigured
instance ofkey Serde class
.ProductionExceptionHandler
defaultProductionExceptionHandler()
TimestampExtractor
defaultTimestampExtractor()
Serde
defaultValueSerde()
Return anconfigured
instance ofvalue Serde class
.Map<String,Object>
getAdminConfigs(String clientId)
Get the configs for theadmin client
.Map<String,Object>
getGlobalConsumerConfigs(String clientId)
Get the configs for theglobal consumer
.Map<String,Object>
getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
Get the configs to themain consumer
.Map<String,Object>
getProducerConfigs(String clientId)
Get the configs for theproducer
.Map<String,Object>
getRestoreConsumerConfigs(String clientId)
Get the configs for therestore-consumer
.static String
globalConsumerPrefix(String consumerProp)
Prefix a property withGLOBAL_CONSUMER_PREFIX
.static void
main(String[] args)
static String
mainConsumerPrefix(String consumerProp)
Prefix a property withMAIN_CONSUMER_PREFIX
.protected Map<String,Object>
postProcessParsedConfig(Map<String,Object> parsedValues)
Called directly after user configs got parsed (and thus default values got set).static String
producerPrefix(String producerProp)
Prefix a property withPRODUCER_PREFIX
.static String
restoreConsumerPrefix(String consumerProp)
Prefix a property withRESTORE_CONSUMER_PREFIX
.static String
topicPrefix(String topicProp)
Prefix a property withTOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.Methods inherited from class org.apache.kafka.common.config.AbstractConfig
documentationOf, equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, nonInternalValues, originals, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
-
Field Details
-
DUMMY_THREAD_INDEX
public static final int DUMMY_THREAD_INDEX- See Also:
- Constant Field Values
-
MAX_TASK_IDLE_MS_DISABLED
public static final long MAX_TASK_IDLE_MS_DISABLED- See Also:
- Constant Field Values
-
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics. These should be valid properties fromTopicConfig
. It is recommended to usetopicPrefix(String)
.- See Also:
- Constant Field Values
-
CONSUMER_PREFIX
Prefix used to isolateconsumer
configs from other client configs. It is recommended to useconsumerPrefix(String)
to add this prefix toconsumer properties
.- See Also:
- Constant Field Values
-
MAIN_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the main consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. main.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
RESTORE_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the restore consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. restore.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
GLOBAL_CONSUMER_PREFIX
Prefix used to overrideconsumer
configs for the global consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. global.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
PRODUCER_PREFIX
Prefix used to isolateproducer
configs from other client configs. It is recommended to useproducerPrefix(String)
to add this prefix toproducer properties
.- See Also:
- Constant Field Values
-
ADMIN_CLIENT_PREFIX
Prefix used to isolateadmin
configs from other client configs. It is recommended to useadminClientPrefix(String)
to add this prefix toadmin client properties
.- See Also:
- Constant Field Values
-
NO_OPTIMIZATION
Config value for parameter"topology.optimization"
for disabling topology optimization- See Also:
- Constant Field Values
-
OPTIMIZE
Config value for parameter"topology.optimization"
for enabling topology optimization- See Also:
- Constant Field Values
-
UPGRADE_FROM_0100
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.0.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0101
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.1.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0102
Config value for parameter"upgrade.from"
for upgrading an application from version0.10.2.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0110
Config value for parameter"upgrade.from"
for upgrading an application from version0.11.0.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_10
Config value for parameter"upgrade.from"
for upgrading an application from version1.0.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_11
Config value for parameter"upgrade.from"
for upgrading an application from version1.1.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_20
Config value for parameter"upgrade.from"
for upgrading an application from version2.0.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_21
Config value for parameter"upgrade.from"
for upgrading an application from version2.1.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_22
Config value for parameter"upgrade.from"
for upgrading an application from version2.2.x
.- See Also:
- Constant Field Values
-
UPGRADE_FROM_23
Config value for parameter"upgrade.from"
for upgrading an application from version2.3.x
.- See Also:
- Constant Field Values
-
AT_LEAST_ONCE
Config value for parameter"processing.guarantee"
for at-least-once processing guarantees.- See Also:
- Constant Field Values
-
EXACTLY_ONCE
Deprecated.Since 3.0.0, will be removed in 4.0. Use"exactly_once_v2"
instead.Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.Enabling exactly-once processing semantics requires broker version 0.11.0 or higher. If you enable this feature Kafka Streams will use more resources (like broker connections) compared to
"at_least_once"
and"exactly_once_v2"
.- See Also:
- Constant Field Values
-
EXACTLY_ONCE_BETA
Deprecated.Since 3.0.0, will be removed in 4.0. Use"exactly_once_v2"
instead.Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.Enabling exactly-once (beta) requires broker version 2.5 or higher. If you enable this feature Kafka Streams will use fewer resources (like broker connections) compared to the
EXACTLY_ONCE
(deprecated) case.- See Also:
- Constant Field Values
-
EXACTLY_ONCE_V2
Config value for parameter"processing.guarantee"
for exactly-once processing guarantees.Enabling exactly-once-v2 requires broker version 2.5 or higher.
- See Also:
- Constant Field Values
-
METRICS_LATEST
Config value for parameter"built.in.metrics.version"
for the latest built-in metrics version.- See Also:
- Constant Field Values
-
ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag
- See Also:
- Constant Field Values
-
APPLICATION_ID_CONFIG
application.id
- See Also:
- Constant Field Values
-
APPLICATION_SERVER_CONFIG
application.server
- See Also:
- Constant Field Values
-
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
- See Also:
- Constant Field Values
-
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
- See Also:
- Constant Field Values
-
BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version
- See Also:
- Constant Field Values
-
CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering
- See Also:
- Constant Field Values
-
CLIENT_ID_CONFIG
client.id
- See Also:
- Constant Field Values
-
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
- See Also:
- Constant Field Values
-
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
- See Also:
- Constant Field Values
-
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler
- See Also:
- Constant Field Values
-
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler
- See Also:
- Constant Field Values
-
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
Deprecated.default.windowed.key.serde.inner
- See Also:
- Constant Field Values
-
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
Deprecated.default.windowed.value.serde.inner
- See Also:
- Constant Field Values
-
WINDOWED_INNER_CLASS_SERDE
- See Also:
- Constant Field Values
-
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
- See Also:
- Constant Field Values
-
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde
- See Also:
- Constant Field Values
-
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor
- See Also:
- Constant Field Values
-
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms
- See Also:
- Constant Field Values
-
MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas
- See Also:
- Constant Field Values
-
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
- See Also:
- Constant Field Values
-
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
- See Also:
- Constant Field Values
-
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
- See Also:
- Constant Field Values
-
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
- See Also:
- Constant Field Values
-
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
- See Also:
- Constant Field Values
-
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
- See Also:
- Constant Field Values
-
NUM_STREAM_THREADS_CONFIG
num.stream.threads
- See Also:
- Constant Field Values
-
POLL_MS_CONFIG
poll.ms
- See Also:
- Constant Field Values
-
PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms
- See Also:
- Constant Field Values
-
PROCESSING_GUARANTEE_CONFIG
processing.guarantee
- See Also:
- Constant Field Values
-
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
- See Also:
- Constant Field Values
-
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
- See Also:
- Constant Field Values
-
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
- See Also:
- Constant Field Values
-
REPLICATION_FACTOR_CONFIG
replication.factor
- See Also:
- Constant Field Values
-
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
- See Also:
- Constant Field Values
-
RETRIES_CONFIG
Deprecated.since 2.7retries
This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
- See Also:
- Constant Field Values
-
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
- See Also:
- Constant Field Values
-
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
- See Also:
- Constant Field Values
-
SECURITY_PROTOCOL_CONFIG
security.protocol
- See Also:
- Constant Field Values
-
SEND_BUFFER_CONFIG
send.buffer.bytes
- See Also:
- Constant Field Values
-
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
- See Also:
- Constant Field Values
-
STATE_DIR_CONFIG
state.dir
- See Also:
- Constant Field Values
-
TASK_TIMEOUT_MS_CONFIG
task.timeout.ms
- See Also:
- Constant Field Values
-
TASK_TIMEOUT_MS_DOC
- See Also:
- Constant Field Values
-
TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization
- See Also:
- Constant Field Values
-
WINDOW_SIZE_MS_CONFIG
window.size.ms
- See Also:
- Constant Field Values
-
UPGRADE_FROM_CONFIG
upgrade.from
- See Also:
- Constant Field Values
-
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
- See Also:
- Constant Field Values
-
TOPOLOGY_OPTIMIZATION
Deprecated.since 2.7; useTOPOLOGY_OPTIMIZATION_CONFIG
insteadtopology.optimization
- See Also:
- Constant Field Values
-
-
Constructor Details
-
StreamsConfig
Create a newStreamsConfig
using the given properties.- Parameters:
props
- properties that specify Kafka Streams and internal consumer/producer configuration
-
StreamsConfig
-
-
Method Details
-
consumerPrefix
Prefix a property withCONSUMER_PREFIX
. This is used to isolateconsumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
CONSUMER_PREFIX
+consumerProp
-
mainConsumerPrefix
Prefix a property withMAIN_CONSUMER_PREFIX
. This is used to isolatemain consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
MAIN_CONSUMER_PREFIX
+consumerProp
-
restoreConsumerPrefix
Prefix a property withRESTORE_CONSUMER_PREFIX
. This is used to isolaterestore consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
RESTORE_CONSUMER_PREFIX
+consumerProp
-
globalConsumerPrefix
Prefix a property withGLOBAL_CONSUMER_PREFIX
. This is used to isolateglobal consumer configs
from other client configs.- Parameters:
consumerProp
- the consumer property to be masked- Returns:
GLOBAL_CONSUMER_PREFIX
+consumerProp
-
producerPrefix
Prefix a property withPRODUCER_PREFIX
. This is used to isolateproducer configs
from other client configs.- Parameters:
producerProp
- the producer property to be masked- Returns:
- PRODUCER_PREFIX +
producerProp
-
adminClientPrefix
Prefix a property withADMIN_CLIENT_PREFIX
. This is used to isolateadmin configs
from other client configs.- Parameters:
adminClientProp
- the admin client property to be masked- Returns:
- ADMIN_CLIENT_PREFIX +
adminClientProp
-
topicPrefix
Prefix a property withTOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.- Parameters:
topicProp
- the topic property to be masked- Returns:
- TOPIC_PREFIX +
topicProp
-
configDef
Return a copy of the config definition.- Returns:
- a copy of the config definition
-
postProcessParsedConfig
Description copied from class:AbstractConfig
Called directly after user configs got parsed (and thus default values got set). This allows to change default values for "secondary defaults" if required.- Overrides:
postProcessParsedConfig
in classAbstractConfig
- Parameters:
parsedValues
- unmodifiable map of current configuration- Returns:
- a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
-
getMainConsumerConfigs
Get the configs to themain consumer
. Properties using the prefixMAIN_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inMAIN_CONSUMER_PREFIX
except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byMAIN_CONSUMER_PREFIX
, main consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
groupId
- consumer groupIdclientId
- clientIdthreadIdx
- stream thread index- Returns:
- Map of the consumer configuration.
-
getRestoreConsumerConfigs
Get the configs for therestore-consumer
. Properties using the prefixRESTORE_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inRESTORE_CONSUMER_PREFIX
except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byRESTORE_CONSUMER_PREFIX
, restore consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
clientId
- clientId- Returns:
- Map of the restore consumer configuration.
-
getGlobalConsumerConfigs
Get the configs for theglobal consumer
. Properties using the prefixGLOBAL_CONSUMER_PREFIX
will be used in favor over the properties prefixed withCONSUMER_PREFIX
and the non-prefixed versions (read the override precedence ordering inGLOBAL_CONSUMER_PREFIX
except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byGLOBAL_CONSUMER_PREFIX
, global consumer will share the general consumer configs prefixed byCONSUMER_PREFIX
.- Parameters:
clientId
- clientId- Returns:
- Map of the global consumer configuration.
-
getProducerConfigs
Get the configs for theproducer
. Properties using the prefixPRODUCER_PREFIX
will be used in favor over their non-prefixed versions except in the case ofProducerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.- Parameters:
clientId
- clientId- Returns:
- Map of the producer configuration.
-
getAdminConfigs
Get the configs for theadmin client
.- Parameters:
clientId
- clientId- Returns:
- Map of the admin client configuration.
-
defaultKeySerde
Return anconfigured
instance ofkey Serde class
.- Returns:
- an configured instance of key Serde class
-
defaultValueSerde
Return anconfigured
instance ofvalue Serde class
.- Returns:
- an configured instance of value Serde class
-
defaultTimestampExtractor
-
defaultDeserializationExceptionHandler
-
defaultProductionExceptionHandler
-
main
-