Class StreamsConfig


  • public class StreamsConfig
    extends AbstractConfig
    Configuration for a KafkaStreams instance. Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin. To avoid consumer/producer/admin property conflicts, you should prefix those properties using consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.

    Example:

    
     // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
     Properties streamsProperties = new Properties();
     streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
     // or
     streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
    
     // suggested:
     Properties streamsProperties = new Properties();
     // sets "metadata.max.age.ms" to 1 minute for consumer only
     streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
     // sets "metadata.max.age.ms" to 1 minute for producer only
     streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
    
     StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
     
    This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:
    
     Properties streamsProperties = new Properties();
     // sets "my.custom.config" to "foo" for consumer only
     streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
     // sets "my.custom.config" to "bar" for producer only
     streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
     // sets "my.custom.config2" to "boom" for all clients universally
     streamsProperties.put("my.custom.config2", "boom");
    
     // as a result, inside producer's serde class configure(..) function,
     // users can now read both key-value pairs "my.custom.config" -> "foo"
     // and "my.custom.config2" -> "boom" from the config map
     StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
     
    When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
         max.poll.interval.ms > max.block.ms
     
    Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
    • "group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID
    • "enable.auto.commit" (false) - Streams client will always disable/turn off auto committing
    • "partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor
    If "processing.guarantee" is set to "exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
    See Also:
    KafkaStreams(org.apache.kafka.streams.Topology, Properties), ConsumerConfig, ProducerConfig