public class StreamsConfig extends AbstractConfig
KafkaStreams
instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer
, KafkaProducer
and AdminClient
.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String)
, producerPrefix(String)
and adminClientPrefix(String)
, respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing both ProducerConfig.RETRIES_CONFIG
and ProducerConfig.MAX_BLOCK_MS_CONFIG
to be more resilient to non-available brokers you should also
consider increasing ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
using the following guidance:
max.poll.interval.ms > min ( max.block.ms, (retries +1) * request.timeout.ms )Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"enable.auto.commit"
(false) - Streams client will always disable/turn off auto committing"processing.guarantee"
is set to "exactly_once"
, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level"
(read_committed) - Consumers will always read committed data only"enable.idempotence"
(true) - Producer will always have idempotency enabled"max.in.flight.requests.per.connection"
(1) - Producer will always have one in-flight request per connectionModifier and Type | Class | Description |
---|---|---|
static class |
StreamsConfig.InternalConfig |
Modifier and Type | Field | Description |
---|---|---|
static java.lang.String |
ADMIN_CLIENT_PREFIX |
Prefix used to isolate
admin configs from other client configs. |
static java.lang.String |
APPLICATION_ID_CONFIG |
application.id |
static java.lang.String |
APPLICATION_SERVER_CONFIG |
user.endpoint |
static java.lang.String |
AT_LEAST_ONCE |
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static java.lang.String |
BOOTSTRAP_SERVERS_CONFIG |
bootstrap.servers |
static java.lang.String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG |
buffered.records.per.partition |
static java.lang.String |
CACHE_MAX_BYTES_BUFFERING_CONFIG |
cache.max.bytes.buffering |
static java.lang.String |
CLIENT_ID_CONFIG |
client.id |
static java.lang.String |
COMMIT_INTERVAL_MS_CONFIG |
commit.interval.ms |
static java.lang.String |
CONNECTIONS_MAX_IDLE_MS_CONFIG |
connections.max.idle.ms |
static java.lang.String |
CONSUMER_PREFIX |
Prefix used to isolate
consumer configs from other client configs. |
static java.lang.String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG |
default.deserialization.exception.handler |
static java.lang.String |
DEFAULT_KEY_SERDE_CLASS_CONFIG |
default key.serde |
static java.lang.String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG |
default timestamp.extractor |
static java.lang.String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG |
default value.serde |
static java.lang.String |
EXACTLY_ONCE |
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static java.lang.String |
KEY_SERDE_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_KEY_SERDE_CLASS_CONFIG instead. |
static java.lang.String |
METADATA_MAX_AGE_CONFIG |
metadata.max.age.ms |
static java.lang.String |
METRIC_REPORTER_CLASSES_CONFIG |
metric.reporters |
static java.lang.String |
METRICS_NUM_SAMPLES_CONFIG |
metrics.num.samples |
static java.lang.String |
METRICS_RECORDING_LEVEL_CONFIG |
metrics.record.level |
static java.lang.String |
METRICS_SAMPLE_WINDOW_MS_CONFIG |
metrics.sample.window.ms |
static java.lang.String |
NUM_STANDBY_REPLICAS_CONFIG |
num.standby.replicas |
static java.lang.String |
NUM_STREAM_THREADS_CONFIG |
num.stream.threads |
static java.lang.String |
PARTITION_GROUPER_CLASS_CONFIG |
partition.grouper |
static java.lang.String |
POLL_MS_CONFIG |
poll.ms |
static java.lang.String |
PROCESSING_GUARANTEE_CONFIG |
processing.guarantee |
static java.lang.String |
PRODUCER_PREFIX |
Prefix used to isolate
producer configs from other client configs. |
static java.lang.String |
RECEIVE_BUFFER_CONFIG |
receive.buffer.bytes |
static java.lang.String |
RECONNECT_BACKOFF_MAX_MS_CONFIG |
reconnect.backoff.max |
static java.lang.String |
RECONNECT_BACKOFF_MS_CONFIG |
reconnect.backoff.ms |
static java.lang.String |
REPLICATION_FACTOR_CONFIG |
replication.factor |
static java.lang.String |
REQUEST_TIMEOUT_MS_CONFIG |
request.timeout.ms |
static java.lang.String |
RETRIES_CONFIG |
retries |
static java.lang.String |
RETRY_BACKOFF_MS_CONFIG |
retry.backoff.ms |
static java.lang.String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG |
rocksdb.config.setter |
static java.lang.String |
SECURITY_PROTOCOL_CONFIG |
security.protocol |
static java.lang.String |
SEND_BUFFER_CONFIG |
send.buffer.bytes |
static java.lang.String |
STATE_CLEANUP_DELAY_MS_CONFIG |
state.cleanup.delay |
static java.lang.String |
STATE_DIR_CONFIG |
state.dir |
static java.lang.String |
TIMESTAMP_EXTRACTOR_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG instead. |
static java.lang.String |
TOPIC_PREFIX |
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static java.lang.String |
VALUE_SERDE_CLASS_CONFIG |
Deprecated.
Use
DEFAULT_VALUE_SERDE_CLASS_CONFIG instead. |
static java.lang.String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG |
windowstore.changelog.additional.retention.ms |
static java.lang.String |
ZOOKEEPER_CONNECT_CONFIG |
Deprecated.
Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
|
Constructor | Description |
---|---|
StreamsConfig(java.util.Map<?,?> props) |
Create a new
StreamsConfig using the given properties. |
Modifier and Type | Method | Description |
---|---|---|
static java.lang.String |
adminClientPrefix(java.lang.String adminClientProp) |
Prefix a property with
ADMIN_CLIENT_PREFIX . |
static ConfigDef |
configDef() |
Return a copy of the config definition.
|
static java.lang.String |
consumerPrefix(java.lang.String consumerProp) |
Prefix a property with
CONSUMER_PREFIX . |
DeserializationExceptionHandler |
defaultDeserializationExceptionHandler() |
|
Serde |
defaultKeySerde() |
Return an
configured instance of key Serde
class . |
ProductionExceptionHandler |
defaultProductionExceptionHandler() |
|
TimestampExtractor |
defaultTimestampExtractor() |
|
Serde |
defaultValueSerde() |
Return an
configured instance of value
Serde class . |
java.util.Map<java.lang.String,java.lang.Object> |
getAdminConfigs(java.lang.String clientId) |
Get the configs for the
admin client . |
java.util.Map<java.lang.String,java.lang.Object> |
getConsumerConfigs(java.lang.String groupId,
java.lang.String clientId) |
Get the configs to the
consumer . |
java.util.Map<java.lang.String,java.lang.Object> |
getProducerConfigs(java.lang.String clientId) |
Get the configs for the
producer . |
java.util.Map<java.lang.String,java.lang.Object> |
getRestoreConsumerConfigs(java.lang.String clientId) |
Get the configs for the
restore-consumer . |
Serde |
keySerde() |
Deprecated.
|
static void |
main(java.lang.String[] args) |
|
protected java.util.Map<java.lang.String,java.lang.Object> |
postProcessParsedConfig(java.util.Map<java.lang.String,java.lang.Object> parsedValues) |
Called directly after user configs got parsed (and thus default values got set).
|
static java.lang.String |
producerPrefix(java.lang.String producerProp) |
Prefix a property with
PRODUCER_PREFIX . |
static java.lang.String |
topicPrefix(java.lang.String topicProp) |
Prefix a property with
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics. |
Serde |
valueSerde() |
Deprecated.
|
equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
public static final java.lang.String TOPIC_PREFIX
TopicConfig
.
It is recommended to use topicPrefix(String)
.public static final java.lang.String CONSUMER_PREFIX
consumer
configs from other client configs.
It is recommended to use consumerPrefix(String)
to add this prefix to consumer
properties
.public static final java.lang.String PRODUCER_PREFIX
producer
configs from other client configs.
It is recommended to use producerPrefix(String)
to add this prefix to producer
properties
.public static final java.lang.String ADMIN_CLIENT_PREFIX
admin
configs from other client configs.
It is recommended to use adminClientPrefix(String)
to add this prefix to producer
properties
.public static final java.lang.String AT_LEAST_ONCE
"processing.guarantee"
for at-least-once processing guarantees.public static final java.lang.String EXACTLY_ONCE
"processing.guarantee"
for exactly-once processing guarantees.public static final java.lang.String APPLICATION_ID_CONFIG
application.id
public static final java.lang.String APPLICATION_SERVER_CONFIG
user.endpoint
public static final java.lang.String BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
public static final java.lang.String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
public static final java.lang.String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering
public static final java.lang.String CLIENT_ID_CONFIG
client.id
public static final java.lang.String COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
public static final java.lang.String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
public static final java.lang.String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler
public static final java.lang.String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
public static final java.lang.String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default timestamp.extractor
public static final java.lang.String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde
@Deprecated public static final java.lang.String KEY_SERDE_CLASS_CONFIG
DEFAULT_KEY_SERDE_CLASS_CONFIG
instead.key.serde
public static final java.lang.String METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
public static final java.lang.String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
public static final java.lang.String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
public static final java.lang.String METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
public static final java.lang.String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
public static final java.lang.String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
public static final java.lang.String NUM_STREAM_THREADS_CONFIG
num.stream.threads
public static final java.lang.String PARTITION_GROUPER_CLASS_CONFIG
partition.grouper
public static final java.lang.String POLL_MS_CONFIG
poll.ms
public static final java.lang.String PROCESSING_GUARANTEE_CONFIG
processing.guarantee
public static final java.lang.String RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
public static final java.lang.String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
public static final java.lang.String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
public static final java.lang.String REPLICATION_FACTOR_CONFIG
replication.factor
public static final java.lang.String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
public static final java.lang.String RETRIES_CONFIG
retries
public static final java.lang.String RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
public static final java.lang.String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
public static final java.lang.String SECURITY_PROTOCOL_CONFIG
security.protocol
public static final java.lang.String SEND_BUFFER_CONFIG
send.buffer.bytes
public static final java.lang.String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
public static final java.lang.String STATE_DIR_CONFIG
state.dir
@Deprecated public static final java.lang.String TIMESTAMP_EXTRACTOR_CLASS_CONFIG
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
instead.timestamp.extractor
@Deprecated public static final java.lang.String VALUE_SERDE_CLASS_CONFIG
DEFAULT_VALUE_SERDE_CLASS_CONFIG
instead.value.serde
public static final java.lang.String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
@Deprecated public static final java.lang.String ZOOKEEPER_CONNECT_CONFIG
zookeeper.connect
public StreamsConfig(java.util.Map<?,?> props)
StreamsConfig
using the given properties.props
- properties that specify Kafka Streams and internal consumer/producer configurationpublic static java.lang.String consumerPrefix(java.lang.String consumerProp)
CONSUMER_PREFIX
. This is used to isolate consumer configs
from other client configs.consumerProp
- the consumer property to be maskedCONSUMER_PREFIX
+ consumerProp
public static java.lang.String producerPrefix(java.lang.String producerProp)
PRODUCER_PREFIX
. This is used to isolate producer configs
from other client configs.producerProp
- the producer property to be maskedproducerProp
public static java.lang.String adminClientPrefix(java.lang.String adminClientProp)
ADMIN_CLIENT_PREFIX
. This is used to isolate admin configs
from other client configs.adminClientProp
- the admin client property to be maskedadminClientProp
public static java.lang.String topicPrefix(java.lang.String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp
- the topic property to be maskedtopicProp
public static ConfigDef configDef()
protected java.util.Map<java.lang.String,java.lang.Object> postProcessParsedConfig(java.util.Map<java.lang.String,java.lang.Object> parsedValues)
AbstractConfig
postProcessParsedConfig
in class AbstractConfig
parsedValues
- unmodifiable map of current configurationpublic java.util.Map<java.lang.String,java.lang.Object> getConsumerConfigs(java.lang.String groupId, java.lang.String clientId)
consumer
.
Properties using the prefix CONSUMER_PREFIX
will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.groupId
- consumer groupIdclientId
- clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getRestoreConsumerConfigs(java.lang.String clientId)
restore-consumer
.
Properties using the prefix CONSUMER_PREFIX
will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId
- clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getProducerConfigs(java.lang.String clientId)
producer
.
Properties using the prefix PRODUCER_PREFIX
will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId
- clientIdpublic java.util.Map<java.lang.String,java.lang.Object> getAdminConfigs(java.lang.String clientId)
admin client
.clientId
- clientId@Deprecated public Serde keySerde()
configured
instance of key Serde
class
. This method is deprecated. Use defaultKeySerde()
method instead.public Serde defaultKeySerde()
configured
instance of key Serde
class
.@Deprecated public Serde valueSerde()
configured
instance of value
Serde class
. This method is deprecated. Use defaultValueSerde()
instead.public Serde defaultValueSerde()
configured
instance of value
Serde class
.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(java.lang.String[] args)