public class StreamsConfig extends AbstractConfig
KafkaStreams instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and AdminClient.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing both ProducerConfig.RETRIES_CONFIG and ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also
consider increasing ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
max.poll.interval.ms > min ( max.block.ms, (retries +1) * request.timeout.ms )
Kafka Streams requires at least the following properties to be set:
By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"enable.auto.commit" (false) - Streams client will always disable/turn off auto committing"processing.guarantee" is set to "exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level" (read_committed) - Consumers will always read committed data only"enable.idempotence" (true) - Producer will always have idempotency enabled"max.in.flight.requests.per.connection" (1) - Producer will always have one in-flight request per connection| Modifier and Type | Class | Description |
|---|---|---|
static class |
StreamsConfig.InternalConfig |
| Modifier and Type | Field | Description |
|---|---|---|
static java.lang.String |
ADMIN_CLIENT_PREFIX |
Prefix used to isolate
admin configs from other client configs. |
static java.lang.String |
APPLICATION_ID_CONFIG |
application.id |
static java.lang.String |
APPLICATION_SERVER_CONFIG |
user.endpoint |
static java.lang.String |
AT_LEAST_ONCE |
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static java.lang.String |
BOOTSTRAP_SERVERS_CONFIG |
bootstrap.servers |
static java.lang.String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG |
buffered.records.per.partition |
static java.lang.String |
CACHE_MAX_BYTES_BUFFERING_CONFIG |
cache.max.bytes.buffering |
static java.lang.String |
CLIENT_ID_CONFIG |
client.id |
static java.lang.String |
COMMIT_INTERVAL_MS_CONFIG |
commit.interval.ms |
static java.lang.String |
CONNECTIONS_MAX_IDLE_MS_CONFIG |
connections.max.idle.ms |
static java.lang.String |
CONSUMER_PREFIX |
Prefix used to isolate
consumer configs from other client configs. |
static java.lang.String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG |
default.deserialization.exception.handler |
static java.lang.String |
DEFAULT_KEY_SERDE_CLASS_CONFIG |
default key.serde |
static java.lang.String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG |
default timestamp.extractor |
static java.lang.String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG |
default value.serde |
static java.lang.String |
EXACTLY_ONCE |
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static java.lang.String |
KEY_SERDE_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_KEY_SERDE_CLASS_CONFIG instead. |
static java.lang.String |
METADATA_MAX_AGE_CONFIG |
metadata.max.age.ms |
static java.lang.String |
METRIC_REPORTER_CLASSES_CONFIG |
metric.reporters |
static java.lang.String |
METRICS_NUM_SAMPLES_CONFIG |
metrics.num.samples |
static java.lang.String |
METRICS_RECORDING_LEVEL_CONFIG |
metrics.record.level |
static java.lang.String |
METRICS_SAMPLE_WINDOW_MS_CONFIG |
metrics.sample.window.ms |
static java.lang.String |
NUM_STANDBY_REPLICAS_CONFIG |
num.standby.replicas |
static java.lang.String |
NUM_STREAM_THREADS_CONFIG |
num.stream.threads |
static java.lang.String |
PARTITION_GROUPER_CLASS_CONFIG |
partition.grouper |
static java.lang.String |
POLL_MS_CONFIG |
poll.ms |
static java.lang.String |
PROCESSING_GUARANTEE_CONFIG |
processing.guarantee |
static java.lang.String |
PRODUCER_PREFIX |
Prefix used to isolate
producer configs from other client configs. |
static java.lang.String |
RECEIVE_BUFFER_CONFIG |
receive.buffer.bytes |
static java.lang.String |
RECONNECT_BACKOFF_MAX_MS_CONFIG |
reconnect.backoff.max |
static java.lang.String |
RECONNECT_BACKOFF_MS_CONFIG |
reconnect.backoff.ms |
static java.lang.String |
REPLICATION_FACTOR_CONFIG |
replication.factor |
static java.lang.String |
REQUEST_TIMEOUT_MS_CONFIG |
request.timeout.ms |
static java.lang.String |
RETRIES_CONFIG |
retries |
static java.lang.String |
RETRY_BACKOFF_MS_CONFIG |
retry.backoff.ms |
static java.lang.String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG |
rocksdb.config.setter |
static java.lang.String |
SECURITY_PROTOCOL_CONFIG |
security.protocol |
static java.lang.String |
SEND_BUFFER_CONFIG |
send.buffer.bytes |
static java.lang.String |
STATE_CLEANUP_DELAY_MS_CONFIG |
state.cleanup.delay |
static java.lang.String |
STATE_DIR_CONFIG |
state.dir |
static java.lang.String |
TIMESTAMP_EXTRACTOR_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG instead. |
static java.lang.String |
TOPIC_PREFIX |
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static java.lang.String |
VALUE_SERDE_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_VALUE_SERDE_CLASS_CONFIG instead. |
static java.lang.String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG |
windowstore.changelog.additional.retention.ms |
static java.lang.String |
ZOOKEEPER_CONNECT_CONFIG |
Deprecated.
Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
|
| Constructor | Description |
|---|---|
StreamsConfig(java.util.Map<?,?> props) |
Create a new
StreamsConfig using the given properties. |
| Modifier and Type | Method | Description |
|---|---|---|
static java.lang.String |
adminClientPrefix(java.lang.String adminClientProp) |
Prefix a property with
ADMIN_CLIENT_PREFIX. |
static ConfigDef |
configDef() |
Return a copy of the config definition.
|
static java.lang.String |
consumerPrefix(java.lang.String consumerProp) |
Prefix a property with
CONSUMER_PREFIX. |
DeserializationExceptionHandler |
defaultDeserializationExceptionHandler() |
|
Serde |
defaultKeySerde() |
Return an
configured instance of key Serde
class. |
ProductionExceptionHandler |
defaultProductionExceptionHandler() |
|
TimestampExtractor |
defaultTimestampExtractor() |
|
Serde |
defaultValueSerde() |
Return an
configured instance of value
Serde class. |
java.util.Map<java.lang.String,java.lang.Object> |
getAdminConfigs(java.lang.String clientId) |
Get the configs for the
admin client. |
java.util.Map<java.lang.String,java.lang.Object> |
getConsumerConfigs(java.lang.String groupId,
java.lang.String clientId) |
Get the configs to the
consumer. |
java.util.Map<java.lang.String,java.lang.Object> |
getProducerConfigs(java.lang.String clientId) |
Get the configs for the
producer. |
java.util.Map<java.lang.String,java.lang.Object> |
getRestoreConsumerConfigs(java.lang.String clientId) |
Get the configs for the
restore-consumer. |
Serde |
keySerde() |
Deprecated.
|
static void |
main(java.lang.String[] args) |
|
protected java.util.Map<java.lang.String,java.lang.Object> |
postProcessParsedConfig(java.util.Map<java.lang.String,java.lang.Object> parsedValues) |
Called directly after user configs got parsed (and thus default values got set).
|
static java.lang.String |
producerPrefix(java.lang.String producerProp) |
Prefix a property with
PRODUCER_PREFIX. |
static java.lang.String |
topicPrefix(java.lang.String topicProp) |
Prefix a property with
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics. |
Serde |
valueSerde() |
Deprecated.
|
equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverridepublic static final java.lang.String TOPIC_PREFIX
TopicConfig.
It is recommended to use topicPrefix(String).public static final java.lang.String CONSUMER_PREFIX
consumer configs from other client configs.
It is recommended to use consumerPrefix(String) to add this prefix to consumer
properties.public static final java.lang.String PRODUCER_PREFIX
producer configs from other client configs.
It is recommended to use producerPrefix(String) to add this prefix to producer
properties.public static final java.lang.String ADMIN_CLIENT_PREFIX
admin configs from other client configs.
It is recommended to use adminClientPrefix(String) to add this prefix to producer
properties.public static final java.lang.String AT_LEAST_ONCE
"processing.guarantee" for at-least-once processing guarantees.public static final java.lang.String EXACTLY_ONCE
"processing.guarantee" for exactly-once processing guarantees.public static final java.lang.String APPLICATION_ID_CONFIG
application.idpublic static final java.lang.String APPLICATION_SERVER_CONFIG
user.endpointpublic static final java.lang.String BOOTSTRAP_SERVERS_CONFIG
bootstrap.serverspublic static final java.lang.String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partitionpublic static final java.lang.String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.bufferingpublic static final java.lang.String CLIENT_ID_CONFIG
client.idpublic static final java.lang.String COMMIT_INTERVAL_MS_CONFIG
commit.interval.mspublic static final java.lang.String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.mspublic static final java.lang.String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handlerpublic static final java.lang.String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serdepublic static final java.lang.String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default timestamp.extractorpublic static final java.lang.String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde@Deprecated public static final java.lang.String KEY_SERDE_CLASS_CONFIG
DEFAULT_KEY_SERDE_CLASS_CONFIG instead.key.serdepublic static final java.lang.String METADATA_MAX_AGE_CONFIG
metadata.max.age.mspublic static final java.lang.String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samplespublic static final java.lang.String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.levelpublic static final java.lang.String METRIC_REPORTER_CLASSES_CONFIG
metric.reporterspublic static final java.lang.String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.mspublic static final java.lang.String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicaspublic static final java.lang.String NUM_STREAM_THREADS_CONFIG
num.stream.threadspublic static final java.lang.String PARTITION_GROUPER_CLASS_CONFIG
partition.grouperpublic static final java.lang.String POLL_MS_CONFIG
poll.mspublic static final java.lang.String PROCESSING_GUARANTEE_CONFIG
processing.guaranteepublic static final java.lang.String RECEIVE_BUFFER_CONFIG
receive.buffer.bytespublic static final java.lang.String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.mspublic static final java.lang.String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.maxpublic static final java.lang.String REPLICATION_FACTOR_CONFIG
replication.factorpublic static final java.lang.String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.mspublic static final java.lang.String RETRIES_CONFIG
retriespublic static final java.lang.String RETRY_BACKOFF_MS_CONFIG
retry.backoff.mspublic static final java.lang.String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setterpublic static final java.lang.String SECURITY_PROTOCOL_CONFIG
security.protocolpublic static final java.lang.String SEND_BUFFER_CONFIG
send.buffer.bytespublic static final java.lang.String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delaypublic static final java.lang.String STATE_DIR_CONFIG
state.dir@Deprecated public static final java.lang.String TIMESTAMP_EXTRACTOR_CLASS_CONFIG
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG instead.timestamp.extractor@Deprecated public static final java.lang.String VALUE_SERDE_CLASS_CONFIG
DEFAULT_VALUE_SERDE_CLASS_CONFIG instead.value.serdepublic static final java.lang.String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms@Deprecated public static final java.lang.String ZOOKEEPER_CONNECT_CONFIG
zookeeper.connectpublic StreamsConfig(java.util.Map<?,?> props)
StreamsConfig using the given properties.props - properties that specify Kafka Streams and internal consumer/producer configurationpublic static java.lang.String consumerPrefix(java.lang.String consumerProp)
CONSUMER_PREFIX. This is used to isolate consumer configs
from other client configs.consumerProp - the consumer property to be maskedCONSUMER_PREFIX + consumerProppublic static java.lang.String producerPrefix(java.lang.String producerProp)
PRODUCER_PREFIX. This is used to isolate producer configs
from other client configs.producerProp - the producer property to be maskedproducerProppublic static java.lang.String adminClientPrefix(java.lang.String adminClientProp)
ADMIN_CLIENT_PREFIX. This is used to isolate admin configs
from other client configs.adminClientProp - the admin client property to be maskedadminClientProppublic static java.lang.String topicPrefix(java.lang.String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp - the topic property to be maskedtopicProppublic static ConfigDef configDef()
protected java.util.Map<java.lang.String,java.lang.Object> postProcessParsedConfig(java.util.Map<java.lang.String,java.lang.Object> parsedValues)
AbstractConfigpostProcessParsedConfig in class AbstractConfigparsedValues - unmodifiable map of current configurationpublic java.util.Map<java.lang.String,java.lang.Object> getConsumerConfigs(java.lang.String groupId,
java.lang.String clientId)
consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.groupId - consumer groupIdclientId - clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getRestoreConsumerConfigs(java.lang.String clientId)
restore-consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getProducerConfigs(java.lang.String clientId)
producer.
Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getAdminConfigs(java.lang.String clientId)
admin client.clientId - clientId@Deprecated public Serde keySerde()
configured instance of key Serde
class. This method is deprecated. Use defaultKeySerde() method instead.public Serde defaultKeySerde()
configured instance of key Serde
class.@Deprecated public Serde valueSerde()
configured instance of value
Serde class. This method is deprecated. Use defaultValueSerde() instead.public Serde defaultValueSerde()
configured instance of value
Serde class.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(java.lang.String[] args)