Package org.apache.kafka.streams
Class StreamsConfig
- java.lang.Object
-
- org.apache.kafka.common.config.AbstractConfig
-
- org.apache.kafka.streams.StreamsConfig
-
public class StreamsConfig extends AbstractConfig
Configuration for aKafkaStreamsinstance. Can also be used to configure the Kafka Streams internalKafkaConsumer,KafkaProducerandAdmin. To avoid consumer/producer/admin property conflicts, you should prefix those properties usingconsumerPrefix(String),producerPrefix(String)andadminClientPrefix(String), respectively.Example:
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer Properties streamsProperties = new Properties(); streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000); // or streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000); // suggested: Properties streamsProperties = new Properties(); // sets "metadata.max.age.ms" to 1 minute for consumer only streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000); // sets "metadata.max.age.ms" to 1 minute for producer only streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000); StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasingProperties streamsProperties = new Properties(); // sets "my.custom.config" to "foo" for consumer only streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo"); // sets "my.custom.config" to "bar" for producer only streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar"); // sets "my.custom.config2" to "boom" for all clients universally streamsProperties.put("my.custom.config2", "boom"); // as a result, inside producer's serde class configure(..) function, // users can now read both key-value pairs "my.custom.config" -> "foo" // and "my.custom.config2" -> "boom" from the config map StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);ProducerConfig.MAX_BLOCK_MS_CONFIGto be more resilient to non-available brokers you should also increaseConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIGusing the following guidance:max.poll.interval.ms > max.block.msKafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):"group.id"(<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit"(false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy"(StreamsPartitionAssignor) - Streams client will always use its own partition assignor
"processing.guarantee"is set to"exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):"isolation.level"(read_committed) - Consumers will always read committed data only"enable.idempotence"(true) - Producer will always have idempotency enabled
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classStreamsConfig.InternalConfig
-
Field Summary
Fields Modifier and Type Field Description static StringACCEPTABLE_RECOVERY_LAG_CONFIGacceptable.recovery.lagstatic StringADMIN_CLIENT_PREFIXPrefix used to isolateadminconfigs from other client configs.static StringAPPLICATION_ID_CONFIGapplication.idstatic StringAPPLICATION_SERVER_CONFIGapplication.serverstatic StringAT_LEAST_ONCEConfig value for parameter"processing.guarantee"for at-least-once processing guarantees.static StringBOOTSTRAP_SERVERS_CONFIGbootstrap.serversstatic StringBUFFERED_RECORDS_PER_PARTITION_CONFIGbuffered.records.per.partitionstatic StringBUILT_IN_METRICS_VERSION_CONFIGbuilt.in.metrics.versionstatic StringCACHE_MAX_BYTES_BUFFERING_CONFIGcache.max.bytes.bufferingstatic StringCLIENT_ID_CONFIGclient.idstatic StringCOMMIT_INTERVAL_MS_CONFIGcommit.interval.msstatic StringCONNECTIONS_MAX_IDLE_MS_CONFIGconnections.max.idle.msstatic StringCONSUMER_PREFIXPrefix used to isolateconsumerconfigs from other client configs.static StringDEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIGdefault.deserialization.exception.handlerstatic StringDEFAULT_KEY_SERDE_CLASS_CONFIGdefault key.serdestatic StringDEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIGdefault.production.exception.handlerstatic StringDEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIGdefault.timestamp.extractorstatic StringDEFAULT_VALUE_SERDE_CLASS_CONFIGdefault value.serdestatic StringDEFAULT_WINDOWED_KEY_SERDE_INNER_CLASSdefault.windowed.key.serde.innerstatic StringDEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASSdefault.windowed.value.serde.innerstatic intDUMMY_THREAD_INDEXstatic StringEXACTLY_ONCEConfig value for parameter"processing.guarantee"for exactly-once processing guarantees.static StringEXACTLY_ONCE_BETAConfig value for parameter"processing.guarantee"for exactly-once processing guarantees.static StringGLOBAL_CONSUMER_PREFIXPrefix used to overrideconsumerconfigs for the global consumer client from the general consumer client configs.static StringMAIN_CONSUMER_PREFIXPrefix used to overrideconsumerconfigs for the main consumer client from the general consumer client configs.static StringMAX_TASK_IDLE_MS_CONFIGmax.task.idle.msstatic StringMAX_WARMUP_REPLICAS_CONFIGmax.warmup.replicasstatic StringMETADATA_MAX_AGE_CONFIGmetadata.max.age.msstatic StringMETRIC_REPORTER_CLASSES_CONFIGmetric.reportersstatic StringMETRICS_0100_TO_24Config value for parameter"built.in.metrics.version"for built-in metrics from version 0.10.0.static StringMETRICS_LATESTConfig value for parameter"built.in.metrics.version"for the latest built-in metrics version.static StringMETRICS_NUM_SAMPLES_CONFIGmetrics.num.samplesstatic StringMETRICS_RECORDING_LEVEL_CONFIGmetrics.record.levelstatic StringMETRICS_SAMPLE_WINDOW_MS_CONFIGmetrics.sample.window.msstatic StringNO_OPTIMIZATIONConfig value for parameter"topology.optimization"for disabling topology optimizationstatic StringNUM_STANDBY_REPLICAS_CONFIGnum.standby.replicasstatic StringNUM_STREAM_THREADS_CONFIGnum.stream.threadsstatic StringOPTIMIZEConfig value for parameter"topology.optimization"for enabling topology optimizationstatic StringPARTITION_GROUPER_CLASS_CONFIGDeprecated.static StringPOLL_MS_CONFIGpoll.msstatic StringPROBING_REBALANCE_INTERVAL_MS_CONFIGprobing.rebalance.interval.msstatic StringPROCESSING_GUARANTEE_CONFIGprocessing.guaranteestatic StringPRODUCER_PREFIXPrefix used to isolateproducerconfigs from other client configs.static StringRECEIVE_BUFFER_CONFIGreceive.buffer.bytesstatic StringRECONNECT_BACKOFF_MAX_MS_CONFIGreconnect.backoff.maxstatic StringRECONNECT_BACKOFF_MS_CONFIGreconnect.backoff.msstatic StringREPLICATION_FACTOR_CONFIGreplication.factorstatic StringREQUEST_TIMEOUT_MS_CONFIGrequest.timeout.msstatic StringRESTORE_CONSUMER_PREFIXPrefix used to overrideconsumerconfigs for the restore consumer client from the general consumer client configs.static StringRETRIES_CONFIGDeprecated.since 2.7static StringRETRY_BACKOFF_MS_CONFIGretry.backoff.msstatic StringROCKSDB_CONFIG_SETTER_CLASS_CONFIGrocksdb.config.setterstatic StringSECURITY_PROTOCOL_CONFIGsecurity.protocolstatic StringSEND_BUFFER_CONFIGsend.buffer.bytesstatic StringSTATE_CLEANUP_DELAY_MS_CONFIGstate.cleanup.delaystatic StringSTATE_DIR_CONFIGstate.dirstatic StringTASK_TIMEOUT_MS_CONFIGtask.timeout.msstatic StringTASK_TIMEOUT_MS_DOCstatic StringTOPIC_PREFIXPrefix used to provide default topic configs to be applied when creating internal topics.static StringTOPOLOGY_OPTIMIZATIONDeprecated.since 2.7; useTOPOLOGY_OPTIMIZATION_CONFIGinsteadstatic StringTOPOLOGY_OPTIMIZATION_CONFIGtopology.optimizationstatic StringUPGRADE_FROM_0100Config value for parameter"upgrade.from"for upgrading an application from version0.10.0.x.static StringUPGRADE_FROM_0101Config value for parameter"upgrade.from"for upgrading an application from version0.10.1.x.static StringUPGRADE_FROM_0102Config value for parameter"upgrade.from"for upgrading an application from version0.10.2.x.static StringUPGRADE_FROM_0110Config value for parameter"upgrade.from"for upgrading an application from version0.11.0.x.static StringUPGRADE_FROM_10Config value for parameter"upgrade.from"for upgrading an application from version1.0.x.static StringUPGRADE_FROM_11Config value for parameter"upgrade.from"for upgrading an application from version1.1.x.static StringUPGRADE_FROM_20Config value for parameter"upgrade.from"for upgrading an application from version2.0.x.static StringUPGRADE_FROM_21Config value for parameter"upgrade.from"for upgrading an application from version2.1.x.static StringUPGRADE_FROM_22Config value for parameter"upgrade.from"for upgrading an application from version2.2.x.static StringUPGRADE_FROM_23Config value for parameter"upgrade.from"for upgrading an application from version2.3.x.static StringUPGRADE_FROM_CONFIGupgrade.fromstatic StringWINDOW_SIZE_MS_CONFIGwindow.size.msstatic StringWINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIGwindowstore.changelog.additional.retention.ms-
Fields inherited from class org.apache.kafka.common.config.AbstractConfig
CONFIG_PROVIDERS_CONFIG
-
-
Constructor Summary
Constructors Modifier Constructor Description StreamsConfig(Map<?,?> props)Create a newStreamsConfigusing the given properties.protectedStreamsConfig(Map<?,?> props, boolean doLog)
-
Method Summary
-
Methods inherited from class org.apache.kafka.common.config.AbstractConfig
documentationOf, equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, nonInternalValues, originals, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
-
-
-
-
Field Detail
-
DUMMY_THREAD_INDEX
public static final int DUMMY_THREAD_INDEX
- See Also:
- Constant Field Values
-
TOPIC_PREFIX
public static final String TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics. These should be valid properties fromTopicConfig. It is recommended to usetopicPrefix(String).- See Also:
- Constant Field Values
-
CONSUMER_PREFIX
public static final String CONSUMER_PREFIX
Prefix used to isolateconsumerconfigs from other client configs. It is recommended to useconsumerPrefix(String)to add this prefix toconsumer properties.- See Also:
- Constant Field Values
-
MAIN_CONSUMER_PREFIX
public static final String MAIN_CONSUMER_PREFIX
Prefix used to overrideconsumerconfigs for the main consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. main.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
RESTORE_CONSUMER_PREFIX
public static final String RESTORE_CONSUMER_PREFIX
Prefix used to overrideconsumerconfigs for the restore consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. restore.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
GLOBAL_CONSUMER_PREFIX
public static final String GLOBAL_CONSUMER_PREFIX
Prefix used to overrideconsumerconfigs for the global consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. global.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]- See Also:
- Constant Field Values
-
PRODUCER_PREFIX
public static final String PRODUCER_PREFIX
Prefix used to isolateproducerconfigs from other client configs. It is recommended to useproducerPrefix(String)to add this prefix toproducer properties.- See Also:
- Constant Field Values
-
ADMIN_CLIENT_PREFIX
public static final String ADMIN_CLIENT_PREFIX
Prefix used to isolateadminconfigs from other client configs. It is recommended to useadminClientPrefix(String)to add this prefix toadmin client properties.- See Also:
- Constant Field Values
-
NO_OPTIMIZATION
public static final String NO_OPTIMIZATION
Config value for parameter"topology.optimization"for disabling topology optimization- See Also:
- Constant Field Values
-
OPTIMIZE
public static final String OPTIMIZE
Config value for parameter"topology.optimization"for enabling topology optimization- See Also:
- Constant Field Values
-
UPGRADE_FROM_0100
public static final String UPGRADE_FROM_0100
Config value for parameter"upgrade.from"for upgrading an application from version0.10.0.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0101
public static final String UPGRADE_FROM_0101
Config value for parameter"upgrade.from"for upgrading an application from version0.10.1.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0102
public static final String UPGRADE_FROM_0102
Config value for parameter"upgrade.from"for upgrading an application from version0.10.2.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_0110
public static final String UPGRADE_FROM_0110
Config value for parameter"upgrade.from"for upgrading an application from version0.11.0.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_10
public static final String UPGRADE_FROM_10
Config value for parameter"upgrade.from"for upgrading an application from version1.0.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_11
public static final String UPGRADE_FROM_11
Config value for parameter"upgrade.from"for upgrading an application from version1.1.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_20
public static final String UPGRADE_FROM_20
Config value for parameter"upgrade.from"for upgrading an application from version2.0.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_21
public static final String UPGRADE_FROM_21
Config value for parameter"upgrade.from"for upgrading an application from version2.1.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_22
public static final String UPGRADE_FROM_22
Config value for parameter"upgrade.from"for upgrading an application from version2.2.x.- See Also:
- Constant Field Values
-
UPGRADE_FROM_23
public static final String UPGRADE_FROM_23
Config value for parameter"upgrade.from"for upgrading an application from version2.3.x.- See Also:
- Constant Field Values
-
AT_LEAST_ONCE
public static final String AT_LEAST_ONCE
Config value for parameter"processing.guarantee"for at-least-once processing guarantees.- See Also:
- Constant Field Values
-
EXACTLY_ONCE
public static final String EXACTLY_ONCE
Config value for parameter"processing.guarantee"for exactly-once processing guarantees.Enabling exactly-once processing semantics requires broker version 0.11.0 or higher. If you enable this feature Kafka Streams will use more resources (like broker connections) compared to the
AT_LEAST_ONCEcase.- See Also:
EXACTLY_ONCE_BETA, Constant Field Values
-
EXACTLY_ONCE_BETA
public static final String EXACTLY_ONCE_BETA
Config value for parameter"processing.guarantee"for exactly-once processing guarantees.Enabling exactly-once (beta) requires broker version 2.5 or higher. If you enable this feature Kafka Streams will use less resources (like broker connections) compare to the
EXACTLY_ONCEcase.- See Also:
- Constant Field Values
-
METRICS_0100_TO_24
public static final String METRICS_0100_TO_24
Config value for parameter"built.in.metrics.version"for built-in metrics from version 0.10.0. to 2.4- See Also:
- Constant Field Values
-
METRICS_LATEST
public static final String METRICS_LATEST
Config value for parameter"built.in.metrics.version"for the latest built-in metrics version.- See Also:
- Constant Field Values
-
ACCEPTABLE_RECOVERY_LAG_CONFIG
public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag- See Also:
- Constant Field Values
-
APPLICATION_ID_CONFIG
public static final String APPLICATION_ID_CONFIG
application.id- See Also:
- Constant Field Values
-
APPLICATION_SERVER_CONFIG
public static final String APPLICATION_SERVER_CONFIG
application.server- See Also:
- Constant Field Values
-
BOOTSTRAP_SERVERS_CONFIG
public static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers- See Also:
- Constant Field Values
-
BUFFERED_RECORDS_PER_PARTITION_CONFIG
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition- See Also:
- Constant Field Values
-
BUILT_IN_METRICS_VERSION_CONFIG
public static final String BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version- See Also:
- Constant Field Values
-
CACHE_MAX_BYTES_BUFFERING_CONFIG
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering- See Also:
- Constant Field Values
-
CLIENT_ID_CONFIG
public static final String CLIENT_ID_CONFIG
client.id- See Also:
- Constant Field Values
-
COMMIT_INTERVAL_MS_CONFIG
public static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms- See Also:
- Constant Field Values
-
CONNECTIONS_MAX_IDLE_MS_CONFIG
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms- See Also:
- Constant Field Values
-
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler- See Also:
- Constant Field Values
-
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler- See Also:
- Constant Field Values
-
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
default.windowed.key.serde.inner- See Also:
- Constant Field Values
-
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
default.windowed.value.serde.inner- See Also:
- Constant Field Values
-
DEFAULT_KEY_SERDE_CLASS_CONFIG
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde- See Also:
- Constant Field Values
-
DEFAULT_VALUE_SERDE_CLASS_CONFIG
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde- See Also:
- Constant Field Values
-
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor- See Also:
- Constant Field Values
-
MAX_TASK_IDLE_MS_CONFIG
public static final String MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms- See Also:
- Constant Field Values
-
MAX_WARMUP_REPLICAS_CONFIG
public static final String MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas- See Also:
- Constant Field Values
-
METADATA_MAX_AGE_CONFIG
public static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.ms- See Also:
- Constant Field Values
-
METRICS_NUM_SAMPLES_CONFIG
public static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples- See Also:
- Constant Field Values
-
METRICS_RECORDING_LEVEL_CONFIG
public static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level- See Also:
- Constant Field Values
-
METRIC_REPORTER_CLASSES_CONFIG
public static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporters- See Also:
- Constant Field Values
-
METRICS_SAMPLE_WINDOW_MS_CONFIG
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms- See Also:
- Constant Field Values
-
NUM_STANDBY_REPLICAS_CONFIG
public static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas- See Also:
- Constant Field Values
-
NUM_STREAM_THREADS_CONFIG
public static final String NUM_STREAM_THREADS_CONFIG
num.stream.threads- See Also:
- Constant Field Values
-
POLL_MS_CONFIG
public static final String POLL_MS_CONFIG
poll.ms- See Also:
- Constant Field Values
-
PROBING_REBALANCE_INTERVAL_MS_CONFIG
public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms- See Also:
- Constant Field Values
-
PROCESSING_GUARANTEE_CONFIG
public static final String PROCESSING_GUARANTEE_CONFIG
processing.guarantee- See Also:
- Constant Field Values
-
RECEIVE_BUFFER_CONFIG
public static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytes- See Also:
- Constant Field Values
-
RECONNECT_BACKOFF_MS_CONFIG
public static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms- See Also:
- Constant Field Values
-
RECONNECT_BACKOFF_MAX_MS_CONFIG
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max- See Also:
- Constant Field Values
-
REPLICATION_FACTOR_CONFIG
public static final String REPLICATION_FACTOR_CONFIG
replication.factor- See Also:
- Constant Field Values
-
REQUEST_TIMEOUT_MS_CONFIG
public static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms- See Also:
- Constant Field Values
-
RETRIES_CONFIG
@Deprecated public static final String RETRIES_CONFIG
Deprecated.since 2.7retriesThis config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
- See Also:
- Constant Field Values
-
RETRY_BACKOFF_MS_CONFIG
public static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms- See Also:
- Constant Field Values
-
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter- See Also:
- Constant Field Values
-
SECURITY_PROTOCOL_CONFIG
public static final String SECURITY_PROTOCOL_CONFIG
security.protocol- See Also:
- Constant Field Values
-
SEND_BUFFER_CONFIG
public static final String SEND_BUFFER_CONFIG
send.buffer.bytes- See Also:
- Constant Field Values
-
STATE_CLEANUP_DELAY_MS_CONFIG
public static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay- See Also:
- Constant Field Values
-
STATE_DIR_CONFIG
public static final String STATE_DIR_CONFIG
state.dir- See Also:
- Constant Field Values
-
TASK_TIMEOUT_MS_CONFIG
public static final String TASK_TIMEOUT_MS_CONFIG
task.timeout.ms- See Also:
- Constant Field Values
-
TASK_TIMEOUT_MS_DOC
public static final String TASK_TIMEOUT_MS_DOC
- See Also:
- Constant Field Values
-
TOPOLOGY_OPTIMIZATION_CONFIG
public static final String TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization- See Also:
- Constant Field Values
-
WINDOW_SIZE_MS_CONFIG
public static final String WINDOW_SIZE_MS_CONFIG
window.size.ms- See Also:
- Constant Field Values
-
UPGRADE_FROM_CONFIG
public static final String UPGRADE_FROM_CONFIG
upgrade.from- See Also:
- Constant Field Values
-
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms- See Also:
- Constant Field Values
-
PARTITION_GROUPER_CLASS_CONFIG
@Deprecated public static final String PARTITION_GROUPER_CLASS_CONFIG
Deprecated.partition.grouper- See Also:
- Constant Field Values
-
TOPOLOGY_OPTIMIZATION
@Deprecated public static final String TOPOLOGY_OPTIMIZATION
Deprecated.since 2.7; useTOPOLOGY_OPTIMIZATION_CONFIGinsteadtopology.optimization- See Also:
- Constant Field Values
-
-
Constructor Detail
-
StreamsConfig
public StreamsConfig(Map<?,?> props)
Create a newStreamsConfigusing the given properties.- Parameters:
props- properties that specify Kafka Streams and internal consumer/producer configuration
-
StreamsConfig
protected StreamsConfig(Map<?,?> props, boolean doLog)
-
-
Method Detail
-
consumerPrefix
public static String consumerPrefix(String consumerProp)
Prefix a property withCONSUMER_PREFIX. This is used to isolateconsumer configsfrom other client configs.- Parameters:
consumerProp- the consumer property to be masked- Returns:
CONSUMER_PREFIX+consumerProp
-
mainConsumerPrefix
public static String mainConsumerPrefix(String consumerProp)
Prefix a property withMAIN_CONSUMER_PREFIX. This is used to isolatemain consumer configsfrom other client configs.- Parameters:
consumerProp- the consumer property to be masked- Returns:
MAIN_CONSUMER_PREFIX+consumerProp
-
restoreConsumerPrefix
public static String restoreConsumerPrefix(String consumerProp)
Prefix a property withRESTORE_CONSUMER_PREFIX. This is used to isolaterestore consumer configsfrom other client configs.- Parameters:
consumerProp- the consumer property to be masked- Returns:
RESTORE_CONSUMER_PREFIX+consumerProp
-
globalConsumerPrefix
public static String globalConsumerPrefix(String consumerProp)
Prefix a property withGLOBAL_CONSUMER_PREFIX. This is used to isolateglobal consumer configsfrom other client configs.- Parameters:
consumerProp- the consumer property to be masked- Returns:
GLOBAL_CONSUMER_PREFIX+consumerProp
-
producerPrefix
public static String producerPrefix(String producerProp)
Prefix a property withPRODUCER_PREFIX. This is used to isolateproducer configsfrom other client configs.- Parameters:
producerProp- the producer property to be masked- Returns:
- PRODUCER_PREFIX +
producerProp
-
adminClientPrefix
public static String adminClientPrefix(String adminClientProp)
Prefix a property withADMIN_CLIENT_PREFIX. This is used to isolateadmin configsfrom other client configs.- Parameters:
adminClientProp- the admin client property to be masked- Returns:
- ADMIN_CLIENT_PREFIX +
adminClientProp
-
topicPrefix
public static String topicPrefix(String topicProp)
Prefix a property withTOPIC_PREFIXused to provide default topic configs to be applied when creating internal topics.- Parameters:
topicProp- the topic property to be masked- Returns:
- TOPIC_PREFIX +
topicProp
-
configDef
public static ConfigDef configDef()
Return a copy of the config definition.- Returns:
- a copy of the config definition
-
postProcessParsedConfig
protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
Description copied from class:AbstractConfigCalled directly after user configs got parsed (and thus default values got set). This allows to change default values for "secondary defaults" if required.- Overrides:
postProcessParsedConfigin classAbstractConfig- Parameters:
parsedValues- unmodifiable map of current configuration- Returns:
- a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
-
getConsumerConfigs
@Deprecated public Map<String,Object> getConsumerConfigs(String groupId, String clientId)
Deprecated.Get the configs to theconsumer. Properties using the prefixCONSUMER_PREFIXwill be used in favor over their non-prefixed versions except in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIGwhere we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.- Parameters:
groupId- consumer groupIdclientId- clientId- Returns:
- Map of the consumer configuration.
-
getMainConsumerConfigs
public Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
Get the configs to themain consumer. Properties using the prefixMAIN_CONSUMER_PREFIXwill be used in favor over the properties prefixed withCONSUMER_PREFIXand the non-prefixed versions (read the override precedence ordering inMAIN_CONSUMER_PREFIXexcept in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIGwhere we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byMAIN_CONSUMER_PREFIX, main consumer will share the general consumer configs prefixed byCONSUMER_PREFIX.- Parameters:
groupId- consumer groupIdclientId- clientIdthreadIdx- stream thread index- Returns:
- Map of the consumer configuration.
-
getRestoreConsumerConfigs
public Map<String,Object> getRestoreConsumerConfigs(String clientId)
Get the configs for therestore-consumer. Properties using the prefixRESTORE_CONSUMER_PREFIXwill be used in favor over the properties prefixed withCONSUMER_PREFIXand the non-prefixed versions (read the override precedence ordering inRESTORE_CONSUMER_PREFIXexcept in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIGwhere we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byRESTORE_CONSUMER_PREFIX, restore consumer will share the general consumer configs prefixed byCONSUMER_PREFIX.- Parameters:
clientId- clientId- Returns:
- Map of the restore consumer configuration.
-
getGlobalConsumerConfigs
public Map<String,Object> getGlobalConsumerConfigs(String clientId)
Get the configs for theglobal consumer. Properties using the prefixGLOBAL_CONSUMER_PREFIXwill be used in favor over the properties prefixed withCONSUMER_PREFIXand the non-prefixed versions (read the override precedence ordering inGLOBAL_CONSUMER_PREFIXexcept in the case ofConsumerConfig.BOOTSTRAP_SERVERS_CONFIGwhere we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified byGLOBAL_CONSUMER_PREFIX, global consumer will share the general consumer configs prefixed byCONSUMER_PREFIX.- Parameters:
clientId- clientId- Returns:
- Map of the global consumer configuration.
-
getProducerConfigs
public Map<String,Object> getProducerConfigs(String clientId)
Get the configs for theproducer. Properties using the prefixPRODUCER_PREFIXwill be used in favor over their non-prefixed versions except in the case ofProducerConfig.BOOTSTRAP_SERVERS_CONFIGwhere we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.- Parameters:
clientId- clientId- Returns:
- Map of the producer configuration.
-
getAdminConfigs
public Map<String,Object> getAdminConfigs(String clientId)
Get the configs for theadmin client.- Parameters:
clientId- clientId- Returns:
- Map of the admin client configuration.
-
defaultKeySerde
public Serde defaultKeySerde()
Return anconfiguredinstance ofkey Serde class.- Returns:
- an configured instance of key Serde class
-
defaultValueSerde
public Serde defaultValueSerde()
Return anconfiguredinstance ofvalue Serde class.- Returns:
- an configured instance of value Serde class
-
defaultTimestampExtractor
public TimestampExtractor defaultTimestampExtractor()
-
defaultDeserializationExceptionHandler
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
-
defaultProductionExceptionHandler
public ProductionExceptionHandler defaultProductionExceptionHandler()
-
main
public static void main(String[] args)
-
-