Class StreamsConfig

java.lang.Object
org.apache.kafka.common.config.AbstractConfig
org.apache.kafka.streams.StreamsConfig

public class StreamsConfig extends AbstractConfig
Configuration for a KafkaStreams instance. Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin. To avoid consumer/producer/admin property conflicts, you should prefix those properties using consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.

Example:


 // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
 Properties streamsProperties = new Properties();
 streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
 // or
 streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);

 // suggested:
 Properties streamsProperties = new Properties();
 // sets "metadata.max.age.ms" to 1 minute for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
 // sets "metadata.max.age.ms" to 1 minute for producer only
 streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);

 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:

 Properties streamsProperties = new Properties();
 // sets "my.custom.config" to "foo" for consumer only
 streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
 // sets "my.custom.config" to "bar" for producer only
 streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
 // sets "my.custom.config2" to "boom" for all clients universally
 streamsProperties.put("my.custom.config2", "boom");

 // as a result, inside producer's serde class configure(..) function,
 // users can now read both key-value pairs "my.custom.config" -> "foo"
 // and "my.custom.config2" -> "boom" from the config map
 StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
 
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
     max.poll.interval.ms > max.block.ms
 
Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
  • "group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID
  • "enable.auto.commit" (false) - Streams client will always disable/turn off auto committing
  • "partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor
If "processing.guarantee" is set to "exactly_once_v2", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
See Also: