public class StreamsConfig extends AbstractConfig
KafkaStreams instance.
Can also be use to configure the Kafka Streams internal KafkaConsumer and KafkaProducer.
To avoid consumer/producer property conflicts, you should prefix those properties using
consumerPrefix(String) and producerPrefix(String), respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
Kafka Streams required to set at least properties "application.id" and
"bootstrap.servers".
Furthermore, it is not allowed to enable "enable.auto.commit" that
is disabled by Kafka Streams by default.| Modifier and Type | Class and Description |
|---|---|
static class |
StreamsConfig.InternalConfig |
| Constructor and Description |
|---|
StreamsConfig(Map<?,?> props)
Create a new
StreamsConfig using the given properties. |
| Modifier and Type | Method and Description |
|---|---|
static ConfigDef |
configDef()
Return a copy of the config definition.
|
static String |
consumerPrefix(String consumerProp)
Prefix a property with
CONSUMER_PREFIX. |
Map<String,Object> |
getConsumerConfigs(StreamThread streamThread,
String groupId,
String clientId)
Get the configs to the
consumer. |
Map<String,Object> |
getProducerConfigs(String clientId)
Get the configs for the
producer. |
Map<String,Object> |
getRestoreConsumerConfigs(String clientId)
Get the configs for the
restore-consumer. |
Serde |
keySerde()
Return an
configured instance of key Serde
class. |
static void |
main(String[] args) |
static String |
producerPrefix(String producerProp)
Prefix a property with
PRODUCER_PREFIX. |
Serde |
valueSerde()
Return an
configured instance of value
Serde class. |
equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, unused, valuespublic static final String CONSUMER_PREFIX
consumer configs from producer configs.
It is recommended to use consumerPrefix(String) to add this prefix to consumer
properties.public static final String PRODUCER_PREFIX
producer configs from consumer configs.
It is recommended to use producerPrefix(String) to add this prefix to producer
properties.public static final String STATE_DIR_CONFIG
state.dir@Deprecated public static final String ZOOKEEPER_CONNECT_CONFIG
zookeeper.connectpublic static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.mspublic static final String POLL_MS_CONFIG
poll.mspublic static final String NUM_STREAM_THREADS_CONFIG
num.stream.threadspublic static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicaspublic static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partitionpublic static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delaypublic static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG
timestamp.extractorpublic static final String PARTITION_GROUPER_CLASS_CONFIG
partition.grouperpublic static final String APPLICATION_ID_CONFIG
application.idpublic static final String REPLICATION_FACTOR_CONFIG
replication.factorpublic static final String KEY_SERDE_CLASS_CONFIG
key.serdepublic static final String VALUE_SERDE_CLASS_CONFIG
value.serdepublic static final String APPLICATION_SERVER_CONFIG
user.endpointpublic static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.mspublic static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samplespublic static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.levelpublic static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporterspublic static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.serverspublic static final String CLIENT_ID_CONFIG
client.idpublic static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setterpublic static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.mspublic static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.bufferingpublic static final String SECURITY_PROTOCOL_CONFIG
public static final String DEFAULT_SECURITY_PROTOCOL
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
public static final String RETRY_BACKOFF_MS_CONFIG
public static final String METADATA_MAX_AGE_CONFIG
public static final String RECONNECT_BACKOFF_MS_CONFIG
public static final String SEND_BUFFER_CONFIG
public static final String RECEIVE_BUFFER_CONFIG
public static final String REQUEST_TIMEOUT_MS_CONFIG
public StreamsConfig(Map<?,?> props)
StreamsConfig using the given properties.props - properties that specify Kafka Streams and internal consumer/producer configurationpublic static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX. This is used to isolate consumer configs
from producer configs.consumerProp - the consumer property to be maskedCONSUMER_PREFIX + consumerProppublic static String producerPrefix(String producerProp)
PRODUCER_PREFIX. This is used to isolate producer configs
from consumer configs.producerProp - the producer property to be maskedproducerProppublic static ConfigDef configDef()
public Map<String,Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException
consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.streamThread - the StreamThread creating a consumergroupId - consumer groupIdclientId - clientIdConfigException - if "enable.auto.commit" was set to false by the userpublic Map<String,Object> getRestoreConsumerConfigs(String clientId) throws ConfigException
restore-consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdConfigException - if "enable.auto.commit" was set to false by the userpublic Map<String,Object> getProducerConfigs(String clientId)
producer.
Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdpublic Serde keySerde()
configured instance of key Serde
class.public Serde valueSerde()
configured instance of value
Serde class.public static void main(String[] args)