Class StreamsConfig

java.lang.Object
org.apache.kafka.common.config.AbstractConfig
org.apache.kafka.streams.StreamsConfig

public class StreamsConfig extends AbstractConfig
Configuration for a KafkaStreams instance. Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin. To avoid consumer/producer/admin property conflicts, you should prefix those properties using consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.

Example:


 // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
 Properties streamsProperties = new Properties();
 streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
 // or
 streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);

 // suggested:
 Properties streamsProperties = new Properties();
 // sets "metadata.max.age.ms" to 1 minute for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
 // sets "metadata.max.age.ms" to 1 minute for producer only
 streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);

 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:

 Properties streamsProperties = new Properties();
 // sets "my.custom.config" to "foo" for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
 // sets "my.custom.config" to "bar" for producer only
 streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
 // sets "my.custom.config2" to "boom" for all clients universally
 streamsProperties.put("my.custom.config2", "boom");

 // as a result, inside producer's serde class configure(..) function,
 // users can now read both key-value pairs "my.custom.config" -> "foo"
 // and "my.custom.config2" -> "boom" from the config map
 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
     max.poll.interval.ms > max.block.ms
 
Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
  • "group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID
  • "enable.auto.commit" (false) - Streams client will always disable/turn off auto committing
  • "partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor
If "processing.guarantee" is set to "exactly_once_v2", "exactly_once" (deprecated), or "exactly_once_beta" (deprecated), Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
See Also:
  • Field Details

  • Constructor Details

    • StreamsConfig

      public StreamsConfig(Map<?,?> props)
      Create a new StreamsConfig using the given properties.
      Parameters:
      props - properties that specify Kafka Streams and internal consumer/producer configuration
    • StreamsConfig

      protected StreamsConfig(Map<?,?> props, boolean doLog)
  • Method Details

    • consumerPrefix

      public static String consumerPrefix(String consumerProp)
      Prefix a property with CONSUMER_PREFIX. This is used to isolate consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      CONSUMER_PREFIX + consumerProp
    • mainConsumerPrefix

      public static String mainConsumerPrefix(String consumerProp)
      Prefix a property with MAIN_CONSUMER_PREFIX. This is used to isolate main consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      MAIN_CONSUMER_PREFIX + consumerProp
    • restoreConsumerPrefix

      public static String restoreConsumerPrefix(String consumerProp)
      Prefix a property with RESTORE_CONSUMER_PREFIX. This is used to isolate restore consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      RESTORE_CONSUMER_PREFIX + consumerProp
    • globalConsumerPrefix

      public static String globalConsumerPrefix(String consumerProp)
      Prefix a property with GLOBAL_CONSUMER_PREFIX. This is used to isolate global consumer configs from other client configs.
      Parameters:
      consumerProp - the consumer property to be masked
      Returns:
      GLOBAL_CONSUMER_PREFIX + consumerProp
    • producerPrefix

      public static String producerPrefix(String producerProp)
      Prefix a property with PRODUCER_PREFIX. This is used to isolate producer configs from other client configs.
      Parameters:
      producerProp - the producer property to be masked
      Returns:
      PRODUCER_PREFIX + producerProp
    • adminClientPrefix

      public static String adminClientPrefix(String adminClientProp)
      Prefix a property with ADMIN_CLIENT_PREFIX. This is used to isolate admin configs from other client configs.
      Parameters:
      adminClientProp - the admin client property to be masked
      Returns:
      ADMIN_CLIENT_PREFIX + adminClientProp
    • topicPrefix

      public static String topicPrefix(String topicProp)
      Prefix a property with TOPIC_PREFIX used to provide default topic configs to be applied when creating internal topics.
      Parameters:
      topicProp - the topic property to be masked
      Returns:
      TOPIC_PREFIX + topicProp
    • configDef

      public static ConfigDef configDef()
      Return a copy of the config definition.
      Returns:
      a copy of the config definition
    • postProcessParsedConfig

      protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
      Description copied from class: AbstractConfig
      Called directly after user configs got parsed (and thus default values got set). This allows to change default values for "secondary defaults" if required.
      Overrides:
      postProcessParsedConfig in class AbstractConfig
      Parameters:
      parsedValues - unmodifiable map of current configuration
      Returns:
      a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
    • getMainConsumerConfigs

      public Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
      Get the configs to the main consumer. Properties using the prefix MAIN_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in MAIN_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by MAIN_CONSUMER_PREFIX, main consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      groupId - consumer groupId
      clientId - clientId
      threadIdx - stream thread index
      Returns:
      Map of the consumer configuration.
    • getRestoreConsumerConfigs

      public Map<String,Object> getRestoreConsumerConfigs(String clientId)
      Get the configs for the restore-consumer. Properties using the prefix RESTORE_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in RESTORE_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by RESTORE_CONSUMER_PREFIX, restore consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      clientId - clientId
      Returns:
      Map of the restore consumer configuration.
    • getGlobalConsumerConfigs

      public Map<String,Object> getGlobalConsumerConfigs(String clientId)
      Get the configs for the global consumer. Properties using the prefix GLOBAL_CONSUMER_PREFIX will be used in favor over the properties prefixed with CONSUMER_PREFIX and the non-prefixed versions (read the override precedence ordering in GLOBAL_CONSUMER_PREFIX except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster. If not specified by GLOBAL_CONSUMER_PREFIX, global consumer will share the general consumer configs prefixed by CONSUMER_PREFIX.
      Parameters:
      clientId - clientId
      Returns:
      Map of the global consumer configuration.
    • getProducerConfigs

      public Map<String,Object> getProducerConfigs(String clientId)
      Get the configs for the producer. Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.
      Parameters:
      clientId - clientId
      Returns:
      Map of the producer configuration.
    • getAdminConfigs

      public Map<String,Object> getAdminConfigs(String clientId)
      Get the configs for the admin client.
      Parameters:
      clientId - clientId
      Returns:
      Map of the admin client configuration.
    • defaultKeySerde

      public Serde defaultKeySerde()
      Return an configured instance of key Serde class.
      Returns:
      an configured instance of key Serde class
    • defaultValueSerde

      public Serde defaultValueSerde()
      Return an configured instance of value Serde class.
      Returns:
      an configured instance of value Serde class
    • defaultTimestampExtractor

      public TimestampExtractor defaultTimestampExtractor()
    • defaultDeserializationExceptionHandler

      public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
    • defaultProductionExceptionHandler

      public ProductionExceptionHandler defaultProductionExceptionHandler()
    • main

      public static void main(String[] args)