Class StreamsConfig

java.lang.Object
org.apache.kafka.common.config.AbstractConfig
org.apache.kafka.streams.StreamsConfig

public class StreamsConfig extends AbstractConfig
Configuration for a KafkaStreams instance. Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin. To avoid consumer/producer/admin property conflicts, you should prefix those properties using consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.

Example:


 // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
 Properties streamsProperties = new Properties();
 streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
 // or
 streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);

 // suggested:
 Properties streamsProperties = new Properties();
 // sets "metadata.max.age.ms" to 1 minute for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
 // sets "metadata.max.age.ms" to 1 minute for producer only
 streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);

 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:

 Properties streamsProperties = new Properties();
 // sets "my.custom.config" to "foo" for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
 // sets "my.custom.config" to "bar" for producer only
 streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
 // sets "my.custom.config2" to "boom" for all clients universally
 streamsProperties.put("my.custom.config2", "boom");

 // as a result, inside producer's serde class configure(..) function,
 // users can now read both key-value pairs "my.custom.config" -> "foo"
 // and "my.custom.config2" -> "boom" from the config map
 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
     max.poll.interval.ms > max.block.ms
 
Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
  • "group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID
  • "enable.auto.commit" (false) - Streams client will always disable/turn off auto committing
  • "partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor
If "processing.guarantee" is set to "exactly_once_v2", "exactly_once" (deprecated), or "exactly_once_beta" (deprecated), Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
See Also:
  • Field Details

    • DUMMY_THREAD_INDEX

      public static final int DUMMY_THREAD_INDEX
      See Also:
    • MAX_TASK_IDLE_MS_DISABLED

      public static final long MAX_TASK_IDLE_MS_DISABLED
      See Also:
    • MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE

      public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE
      See Also:
    • MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH

      public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
      See Also:
    • MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH

      public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
      See Also:
    • TOPIC_PREFIX

      public static final String TOPIC_PREFIX
      Prefix used to provide default topic configs to be applied when creating internal topics. These should be valid properties from TopicConfig. It is recommended to use topicPrefix(String).
      See Also:
    • CONSUMER_PREFIX

      public static final String CONSUMER_PREFIX
      Prefix used to isolate consumer configs from other client configs. It is recommended to use consumerPrefix(String) to add this prefix to consumer properties.
      See Also:
    • MAIN_CONSUMER_PREFIX

      public static final String MAIN_CONSUMER_PREFIX
      Prefix used to override consumer configs for the main consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. main.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]
      See Also:
    • RESTORE_CONSUMER_PREFIX

      public static final String RESTORE_CONSUMER_PREFIX
      Prefix used to override consumer configs for the restore consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. restore.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]
      See Also:
    • GLOBAL_CONSUMER_PREFIX

      public static final String GLOBAL_CONSUMER_PREFIX
      Prefix used to override consumer configs for the global consumer client from the general consumer client configs. The override precedence is the following (from highest to lowest precedence): 1. global.consumer.[config-name] 2. consumer.[config-name] 3. [config-name]
      See Also:
    • PRODUCER_PREFIX

      public static final String PRODUCER_PREFIX
      Prefix used to isolate producer configs from other client configs. It is recommended to use producerPrefix(String) to add this prefix to producer properties.
      See Also:
    • ADMIN_CLIENT_PREFIX

      public static final String ADMIN_CLIENT_PREFIX
      Prefix used to isolate admin configs from other client configs. It is recommended to use adminClientPrefix(String) to add this prefix to admin client properties.
      See Also:
    • CLIENT_TAG_PREFIX

      public static final String CLIENT_TAG_PREFIX
      Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs. Example: client.tag.zone=zone1 client.tag.cluster=cluster1
      See Also:
    • TOPOLOGY_OPTIMIZATION_CONFIG

      public static final String TOPOLOGY_OPTIMIZATION_CONFIG
      See Also:
    • NO_OPTIMIZATION

      public static final String NO_OPTIMIZATION
      Config value for parameter "topology.optimization" for disabling topology optimization
      See Also:
    • OPTIMIZE

      public static final String OPTIMIZE
      Config value for parameter "topology.optimization" for enabling topology optimization
      See Also:
    • REUSE_KTABLE_SOURCE_TOPICS

      public static final String REUSE_KTABLE_SOURCE_TOPICS
      Config value for parameter "topology.optimization" for enabling the specific optimization that reuses source topic as changelog topic for KTables.
      See Also:
    • MERGE_REPARTITION_TOPICS

      public static final String MERGE_REPARTITION_TOPICS
      Config value for parameter "topology.optimization" for enabling the specific optimization that merges duplicated repartition topics.
      See Also:
    • SINGLE_STORE_SELF_JOIN

      public static final String SINGLE_STORE_SELF_JOIN
      Config value for parameter "topology.optimization" for enabling the optimization that optimizes inner stream-stream joins into self-joins when both arguments are the same stream.
      See Also:
    • UPGRADE_FROM_0100

      public static final String UPGRADE_FROM_0100
      Config value for parameter "upgrade.from" for upgrading an application from version 0.10.0.x.
    • UPGRADE_FROM_0101

      public static final String UPGRADE_FROM_0101
      Config value for parameter "upgrade.from" for upgrading an application from version 0.10.1.x.
    • UPGRADE_FROM_0102

      public static final String UPGRADE_FROM_0102
      Config value for parameter "upgrade.from" for upgrading an application from version 0.10.2.x.
    • UPGRADE_FROM_0110

      public static final String UPGRADE_FROM_0110
      Config value for parameter "upgrade.from" for upgrading an application from version 0.11.0.x.
    • UPGRADE_FROM_10

      public static final String UPGRADE_FROM_10
      Config value for parameter "upgrade.from" for upgrading an application from version 1.0.x.
    • UPGRADE_FROM_11

      public static final String UPGRADE_FROM_11
      Config value for parameter "upgrade.from" for upgrading an application from version 1.1.x.
    • UPGRADE_FROM_20

      public static final String UPGRADE_FROM_20
      Config value for parameter "upgrade.from" for upgrading an application from version 2.0.x.
    • UPGRADE_FROM_21

      public static final String UPGRADE_FROM_21
      Config value for parameter "upgrade.from" for upgrading an application from version 2.1.x.
    • UPGRADE_FROM_22

      public static final String UPGRADE_FROM_22
      Config value for parameter "upgrade.from" for upgrading an application from version 2.2.x.
    • UPGRADE_FROM_23

      public static final String UPGRADE_FROM_23
      Config value for parameter "upgrade.from" for upgrading an application from version 2.3.x.
    • UPGRADE_FROM_24

      public static final String UPGRADE_FROM_24
      Config value for parameter "upgrade.from" for upgrading an application from version 2.4.x.
    • UPGRADE_FROM_25

      public static final String UPGRADE_FROM_25
      Config value for parameter "upgrade.from" for upgrading an application from version 2.5.x.
    • UPGRADE_FROM_26

      public static final String UPGRADE_FROM_26
      Config value for parameter "upgrade.from" for upgrading an application from version 2.6.x.
    • UPGRADE_FROM_27

      public static final String UPGRADE_FROM_27
      Config value for parameter "upgrade.from" for upgrading an application from version 2.7.x.
    • UPGRADE_FROM_28

      public static final String UPGRADE_FROM_28
      Config value for parameter "upgrade.from" for upgrading an application from version 2.8.x.
    • UPGRADE_FROM_30

      public static final String UPGRADE_FROM_30
      Config value for parameter "upgrade.from" for upgrading an application from version 3.0.x.
    • UPGRADE_FROM_31

      public static final String UPGRADE_FROM_31
      Config value for parameter "upgrade.from" for upgrading an application from version 3.1.x.
    • UPGRADE_FROM_32

      public static final String UPGRADE_FROM_32
      Config value for parameter "upgrade.from" for upgrading an application from version 3.2.x.
    • UPGRADE_FROM_33

      public static final String UPGRADE_FROM_33
      Config value for parameter "upgrade.from" for upgrading an application from version 3.3.x.
    • UPGRADE_FROM_34

      public static final String UPGRADE_FROM_34
      Config value for parameter "upgrade.from" for upgrading an application from version 3.4.x.
    • UPGRADE_FROM_35

      public static final String UPGRADE_FROM_35
      Config value for parameter "upgrade.from" for upgrading an application from version 3.5.x.
    • AT_LEAST_ONCE

      public static final String AT_LEAST_ONCE
      Config value for parameter "processing.guarantee" for at-least-once processing guarantees.
      See Also:
    • EXACTLY_ONCE

      @Deprecated public static final String EXACTLY_ONCE
      Deprecated.
      Since 3.0.0, will be removed in 4.0. Use "exactly_once_v2" instead.
      Config value for parameter "processing.guarantee" for exactly-once processing guarantees.

      Enabling exactly-once processing semantics requires broker version 0.11.0 or higher. If you enable this feature Kafka Streams will use more resources (like broker connections) compared to "at_least_once" and "exactly_once_v2".

      See Also:
    • EXACTLY_ONCE_BETA

      @Deprecated public static final String EXACTLY_ONCE_BETA
      Deprecated.
      Since 3.0.0, will be removed in 4.0. Use "exactly_once_v2" instead.
      Config value for parameter "processing.guarantee" for exactly-once processing guarantees.

      Enabling exactly-once (beta) requires broker version 2.5 or higher. If you enable this feature Kafka Streams will use fewer resources (like broker connections) compared to the EXACTLY_ONCE (deprecated) case.

      See Also:
    • EXACTLY_ONCE_V2

      public static final String EXACTLY_ONCE_V2
      Config value for parameter "processing.guarantee" for exactly-once processing guarantees.

      Enabling exactly-once-v2 requires broker version 2.5 or higher.

      See Also:
    • METRICS_LATEST

      public static final String METRICS_LATEST
      Config value for parameter "built.in.metrics.version" for the latest built-in metrics version.
      See Also:
    • ACCEPTABLE_RECOVERY_LAG_CONFIG

      public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG
      acceptable.recovery.lag
      See Also:
    • APPLICATION_ID_CONFIG

      public static final String APPLICATION_ID_CONFIG
      application.id
      See Also:
    • APPLICATION_SERVER_CONFIG

      public static final String APPLICATION_SERVER_CONFIG
      application.server
      See Also:
    • BOOTSTRAP_SERVERS_CONFIG

      public static final String BOOTSTRAP_SERVERS_CONFIG
      bootstrap.servers
      See Also:
    • BUFFERED_RECORDS_PER_PARTITION_CONFIG

      public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
      buffered.records.per.partition
      See Also:
    • BUFFERED_RECORDS_PER_PARTITION_DOC

      public static final String BUFFERED_RECORDS_PER_PARTITION_DOC
      See Also:
    • BUILT_IN_METRICS_VERSION_CONFIG

      public static final String BUILT_IN_METRICS_VERSION_CONFIG
      built.in.metrics.version
      See Also:
    • CACHE_MAX_BYTES_BUFFERING_CONFIG

      @Deprecated public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
      Deprecated.
      cache.max.bytes.buffering
      See Also:
    • CACHE_MAX_BYTES_BUFFERING_DOC

      public static final String CACHE_MAX_BYTES_BUFFERING_DOC
      See Also:
    • STATESTORE_CACHE_MAX_BYTES_CONFIG

      public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG
      statestore.cache.max.bytes
      See Also:
    • STATESTORE_CACHE_MAX_BYTES_DOC

      public static final String STATESTORE_CACHE_MAX_BYTES_DOC
      See Also:
    • CLIENT_ID_CONFIG

      public static final String CLIENT_ID_CONFIG
      client.id
      See Also:
    • COMMIT_INTERVAL_MS_CONFIG

      public static final String COMMIT_INTERVAL_MS_CONFIG
      commit.interval.ms
      See Also:
    • REPARTITION_PURGE_INTERVAL_MS_CONFIG

      public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG
      repartition.purge.interval.ms
      See Also:
    • CONNECTIONS_MAX_IDLE_MS_CONFIG

      public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
      connections.max.idle.ms
      See Also:
    • DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG

      public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
      default.deserialization.exception.handler
      See Also:
    • DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC

      public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
      See Also:
    • DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG

      public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
      default.production.exception.handler
      See Also:
    • DEFAULT_DSL_STORE_CONFIG

      public static final String DEFAULT_DSL_STORE_CONFIG
      default.dsl.store
      See Also:
    • DEFAULT_DSL_STORE_DOC

      public static final String DEFAULT_DSL_STORE_DOC
      See Also:
    • ROCKS_DB

      public static final String ROCKS_DB
      See Also:
    • IN_MEMORY

      public static final String IN_MEMORY
      See Also:
    • DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS

      @Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
      Deprecated.
      default.windowed.key.serde.inner
      See Also:
    • DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS

      @Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
      Deprecated.
      default.windowed.value.serde.inner
      See Also:
    • WINDOWED_INNER_CLASS_SERDE

      public static final String WINDOWED_INNER_CLASS_SERDE
      See Also:
    • DEFAULT_KEY_SERDE_CLASS_CONFIG

      public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
      default key.serde
      See Also:
    • DEFAULT_VALUE_SERDE_CLASS_CONFIG

      public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
      default value.serde
      See Also:
    • DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG

      public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
      default.timestamp.extractor
      See Also:
    • DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC

      public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC
      See Also:
    • MAX_TASK_IDLE_MS_CONFIG

      public static final String MAX_TASK_IDLE_MS_CONFIG
      max.task.idle.ms
      See Also:
    • MAX_TASK_IDLE_MS_DOC

      public static final String MAX_TASK_IDLE_MS_DOC
      See Also:
    • MAX_WARMUP_REPLICAS_CONFIG

      public static final String MAX_WARMUP_REPLICAS_CONFIG
      max.warmup.replicas
      See Also:
    • METADATA_MAX_AGE_CONFIG

      public static final String METADATA_MAX_AGE_CONFIG
      metadata.max.age.ms
      See Also:
    • METRICS_NUM_SAMPLES_CONFIG

      public static final String METRICS_NUM_SAMPLES_CONFIG
      metrics.num.samples
      See Also:
    • METRICS_RECORDING_LEVEL_CONFIG

      public static final String METRICS_RECORDING_LEVEL_CONFIG
      metrics.record.level
      See Also:
    • METRIC_REPORTER_CLASSES_CONFIG

      public static final String METRIC_REPORTER_CLASSES_CONFIG
      metric.reporters
      See Also:
    • METRICS_SAMPLE_WINDOW_MS_CONFIG

      public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
      metrics.sample.window.ms
      See Also:
    • AUTO_INCLUDE_JMX_REPORTER_CONFIG

      @Deprecated public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG
      Deprecated.
      auto.include.jmx.reporter
      See Also:
    • NUM_STANDBY_REPLICAS_CONFIG

      public static final String NUM_STANDBY_REPLICAS_CONFIG
      num.standby.replicas
      See Also:
    • NUM_STREAM_THREADS_CONFIG

      public static final String NUM_STREAM_THREADS_CONFIG
      num.stream.threads
      See Also:
    • POLL_MS_CONFIG

      public static final String POLL_MS_CONFIG
      poll.ms
      See Also:
    • PROBING_REBALANCE_INTERVAL_MS_CONFIG

      public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG
      probing.rebalance.interval.ms
      See Also:
    • PROCESSING_GUARANTEE_CONFIG

      public static final String PROCESSING_GUARANTEE_CONFIG
      processing.guarantee
      See Also:
    • RECEIVE_BUFFER_CONFIG

      public static final String RECEIVE_BUFFER_CONFIG
      receive.buffer.bytes
      See Also:
    • RACK_AWARE_ASSIGNMENT_TAGS_CONFIG

      public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
      rack.aware.assignment.tags
      See Also:
    • RECONNECT_BACKOFF_MS_CONFIG

      public static final String RECONNECT_BACKOFF_MS_CONFIG
      reconnect.backoff.ms
      See Also:
    • RECONNECT_BACKOFF_MAX_MS_CONFIG

      public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
      reconnect.backoff.max
      See Also:
    • REPLICATION_FACTOR_CONFIG

      public static final String REPLICATION_FACTOR_CONFIG
      replication.factor
      See Also:
    • REQUEST_TIMEOUT_MS_CONFIG

      public static final String REQUEST_TIMEOUT_MS_CONFIG
      request.timeout.ms
      See Also:
    • RETRIES_CONFIG

      @Deprecated public static final String RETRIES_CONFIG
      Deprecated.
      since 2.7
      retries

      This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.

      See Also:
    • RETRY_BACKOFF_MS_CONFIG

      public static final String RETRY_BACKOFF_MS_CONFIG
      retry.backoff.ms
      See Also:
    • ROCKSDB_CONFIG_SETTER_CLASS_CONFIG

      public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
      rocksdb.config.setter
      See Also:
    • SECURITY_PROTOCOL_CONFIG

      public static final String SECURITY_PROTOCOL_CONFIG
      security.protocol
      See Also:
    • SEND_BUFFER_CONFIG

      public static final String SEND_BUFFER_CONFIG
      send.buffer.bytes
      See Also:
    • STATE_CLEANUP_DELAY_MS_CONFIG

      public static final String STATE_CLEANUP_DELAY_MS_CONFIG
      state.cleanup.delay
      See Also:
    • STATE_DIR_CONFIG

      public static final String STATE_DIR_CONFIG
      state.dir
      See Also:
    • TASK_TIMEOUT_MS_CONFIG

      public static final String TASK_TIMEOUT_MS_CONFIG
      task.timeout.ms
      See Also:
    • TASK_TIMEOUT_MS_DOC

      public static final String TASK_TIMEOUT_MS_DOC
      See Also:
    • WINDOW_SIZE_MS_CONFIG

      public static final String WINDOW_SIZE_MS_CONFIG
      window.size.ms
      See Also:
    • UPGRADE_FROM_CONFIG

      public static final String UPGRADE_FROM_CONFIG
      upgrade.from
      See Also:
    • WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG

      public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
      windowstore.changelog.additional.retention.ms
      See Also:
    • DEFAULT_CLIENT_SUPPLIER_CONFIG

      public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG
      default.client.supplier
      See Also:
    • DEFAULT_CLIENT_SUPPLIER_DOC

      public static final String DEFAULT_CLIENT_SUPPLIER_DOC
      See Also:
    • RACK_AWARE_ASSIGNMENT_STRATEGY_NONE

      public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE
      See Also:
    • RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC

      public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
      See Also:
    • RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG

      public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG
      rack.aware.assignment.strategy
      See Also:
    • RACK_AWARE_ASSIGNMENT_STRATEGY_DOC

      public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC
      See Also:
    • RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG

      public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG
      See Also:
    • RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC

      public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC
    • RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG

      public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
      See Also:
    • RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC

      public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC
    • TOPOLOGY_OPTIMIZATION

      @Deprecated public static final String TOPOLOGY_OPTIMIZATION
      Deprecated.
      since 2.7; use TOPOLOGY_OPTIMIZATION_CONFIG instead
      topology.optimization
      See Also:
  • Constructor Details

    • StreamsConfig

      public StreamsConfig(Map<?,?> props)
      Create a new StreamsConfig using the given properties.
      Parameters:
      props - properties that specify Kafka Streams and internal consumer/producer configuration
    • StreamsConfig

      protected StreamsConfig(Map<?,?> props, boolean doLog)
  • Method Details

    • consumerPrefix

      public static String consumerPrefix(String consumerProp)
      Prefix a property with CONSUMER_PREFIX. This is used to isolate consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      CONSUMER_PREFIX + consumerProp
    • mainConsumerPrefix

      public static String mainConsumerPrefix(String consumerProp)
      Prefix a property with MAIN_CONSUMER_PREFIX. This is used to isolate main consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      MAIN_CONSUMER_PREFIX + consumerProp
    • restoreConsumerPrefix

      public static String restoreConsumerPrefix(String consumerProp)
      Prefix a property with RESTORE_CONSUMER_PREFIX. This is used to isolate restore consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      RESTORE_CONSUMER_PREFIX + consumerProp
    • clientTagPrefix

      public static String clientTagPrefix(String clientTagKey)
      Prefix a client tag key with CLIENT_TAG_PREFIX.
      Parameters:
      clientTagKey - client tag key
      Returns:
      CLIENT_TAG_PREFIX + clientTagKey
    • globalConsumerPrefix

      public static String globalConsumerPrefix(String consumerProp)
      Prefix a property with GLOBAL_CONSUMER_PREFIX. This is used to isolate global consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      GLOBAL_CONSUMER_PREFIX + consumerProp
    • producerPrefix

      public static String producerPrefix(String producerProp)
      Prefix a property with PRODUCER_PREFIX. This is used to isolate producer configs from other client configs.
      Parameters:
      producerProp - the producer property to be masked
      Returns:
      PRODUCER_PREFIX + producerProp
    • adminClientPrefix

      public static String adminClientPrefix(String adminClientProp)
      Prefix a property with ADMIN_CLIENT_PREFIX. This is used to isolate admin configs from other client configs.
      Parameters:
      adminClientProp - the admin client property to be masked
      Returns:
      ADMIN_CLIENT_PREFIX + adminClientProp
    • topicPrefix

      public static String topicPrefix(String topicProp)
      Prefix a property with TOPIC_PREFIX used to provide default topic configs to be applied when creating internal topics.
      Parameters:
      topicProp - the topic property to be masked
      Returns:
      TOPIC_PREFIX + topicProp
    • configDef

      public static ConfigDef configDef()
      Return a copy of the config definition.
      Returns:
      a copy of the config definition
    • postProcessParsedConfig

      protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
      Description copied from class: AbstractConfig
      Called directly after user configs got parsed (and thus default values got set). This allows to change default values for "secondary defaults" if required.
      Overrides:
      postProcessParsedConfig in class AbstractConfig
      Parameters:
      parsedValues - unmodifiable map of current configuration
      Returns:
      a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
    • getMainConsumerConfigs

      public Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
      Get the configs to the main consumer. Properties using the prefix MAIN_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in MAIN_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by MAIN_CONSUMER_PREFIX, main consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      groupId - consumer groupId
      clientId - clientId
      threadIdx - stream thread index
      Returns:
      Map of the consumer configuration.
    • getRestoreConsumerConfigs

      public Map<String,Object> getRestoreConsumerConfigs(String clientId)
      Get the configs for the restore-consumer. Properties using the prefix RESTORE_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in RESTORE_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by RESTORE_CONSUMER_PREFIX, restore consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      clientId - clientId
      Returns:
      Map of the restore consumer configuration.
    • getGlobalConsumerConfigs

      public Map<String,Object> getGlobalConsumerConfigs(String clientId)
      Get the configs for the global consumer. Properties using the prefix GLOBAL_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in GLOBAL_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by GLOBAL_CONSUMER_PREFIX, global consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      clientId - clientId
      Returns:
      Map of the global consumer configuration.
    • getProducerConfigs

      public Map<String,Object> getProducerConfigs(String clientId)
      Get the configs for the producer. Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.
      Parameters:
      clientId - clientId
      Returns:
      Map of the producer configuration.
    • getAdminConfigs

      public Map<String,Object> getAdminConfigs(String clientId)
      Get the configs for the admin client.
      Parameters:
      clientId - clientId
      Returns:
      Map of the admin client configuration.
    • getClientTags

      public Map<String,String> getClientTags()
      Get the configured client tags set with CLIENT_TAG_PREFIX prefix.
      Returns:
      Map of the client tags.
    • verifyTopologyOptimizationConfigs

      public static Set<String> verifyTopologyOptimizationConfigs(String config)
    • getKafkaClientSupplier

      public KafkaClientSupplier getKafkaClientSupplier()
      Return configured KafkaClientSupplier
      Returns:
      Configured KafkaClientSupplier
    • defaultKeySerde

      public Serde defaultKeySerde()
      Return an configured instance of key Serde class.
      Returns:
      an configured instance of key Serde class
    • defaultValueSerde

      public Serde defaultValueSerde()
      Return an configured instance of value Serde class.
      Returns:
      an configured instance of value Serde class
    • defaultTimestampExtractor

      public TimestampExtractor defaultTimestampExtractor()
    • defaultDeserializationExceptionHandler

      public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
    • defaultProductionExceptionHandler

      public ProductionExceptionHandler defaultProductionExceptionHandler()
    • main

      public static void main(String[] args)