java.io.Closeable
, java.lang.AutoCloseable
, Consumer<K,V>
public class KafkaConsumer<K,V> extends java.lang.Object implements Consumer<K,V>
This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster. This client also interacts with the broker to allow groups of consumers to load balance consumption using consumer groups.
The consumer maintains TCP connections to the necessary brokers to fetch data. Failure to close the consumer after use will leak these connections. The consumer is not thread-safe. See Multi-threaded Processing for more details.
UnsupportedVersionException
when invoking an API that is not available on the running broker version.
The position
of the consumer gives the offset of the next record that will be given
out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
every time the consumer receives messages in a call to poll(long)
.
The committed position
is the last offset that has been stored securely. Should the
process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit
offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs
(e.g. commitSync
and commitAsync
).
This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.
group.id
will be part of the same consumer group.
Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the
subscribe
APIs. Kafka will deliver each message in the
subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all
members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there
is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.
Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will
be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved
from existing consumers to the new one. This is known as rebalancing the group and is discussed in more
detail below. Group rebalancing is also used when new partitions are added
to one of the subscribed topics or when a new topic matching a subscribed regex
is created. The group will automatically detect the new partitions through periodic metadata refreshes and
assign them to members of the group.
Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are actually quite cheap).
This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would have its own consumer group, so each process would subscribe to all the records published to the topic.
In addition, when group reassignment happens automatically, consumers can be notified through a ConsumerRebalanceListener
,
which allows them to finish necessary application-level logic such as state cleanup, manual offset
commits, etc. See Storing Offsets Outside Kafka for more details.
It is also possible for the consumer to manually assign specific partitions
(similar to the older "simple" consumer) using assign(Collection)
. In this case, dynamic partition
assignment and consumer group coordination will be disabled.
poll(long)
is
invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
a duration of session.timeout.ms
, then the consumer will be considered dead and its partitions will
be reassigned.
It is also possible that the consumer could encounter a "livelock" situation where it is continuing
to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms
setting. Basically if you don't call poll at least as frequently as the configured max interval,
then the client will proactively leave the group so that another consumer can take over its partitions. When this happens,
you may see an offset commit failure (as indicated by a CommitFailedException
thrown from a call to commitSync()
).
This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
So to stay in the group, you must continue to call poll.
The consumer provides two configuration settings to control the behavior of the poll loop:
max.poll.interval.ms
: By increasing the interval between expected polls, you can give
the consumer more time to handle a batch of records returned from poll(long)
. The drawback
is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
inside the call to poll. You can use this setting to bound the time to finish a rebalance, but
you risk slower progress if the consumer cannot actually call poll
often enough.max.poll.records
: Use this setting to limit the total records returned from a single
call to poll. This can make it easier to predict the maximum that must be handled within each poll
interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the
impact of group rebalancing.
For use cases where message processing time varies unpredictably, neither of these options may be sufficient.
The recommended way to handle these cases is to move message processing to another thread, which allows
the consumer to continue calling poll
while the processor is still working. Some care must be taken
to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic
commits and manually commit processed offsets for records only after the thread has finished handling them
(depending on the delivery semantics you need). Note also that you will need to pause
the partition so that no new records are received from poll until after thread has finished handling those
previously returned.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the configuration
>bootstrap.servers
. This list is just used to discover the rest of the brokers in the
cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
case there are servers down when the client is connecting).
Setting enable.auto.commit
means that offsets are committed automatically with a frequency controlled by
the config auto.commit.interval.ms
.
In this example the consumer is subscribing to the topics foo and bar as part of a group of consumers
called test as configured with group.id
.
The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }In this example we will consume a batch of records and batch them up in memory. When we have enough records batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records would be considered consumed after they were returned to the user in
poll
. It would then be possible
for our process to fail after batching the records, but before they had been inserted into the database.
To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility: the process could fail in the interval after the insert into the database but before the commit (even though this would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one time but in failure cases could be duplicated.
Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that
you must consume all data returned from each call to poll(long)
before any subsequent calls, or before
closing
the consumer. If you fail to do either of these, it is possible for the committed offset
to get ahead of the consumed position, which results in missing records. The advantage of using manual offset
control is that you have direct control over when a record is considered "consumed."
The above example uses commitSync
to mark all received records as committed. In some cases
you may wish to have even finer control over which records have been committed by specifying an offset explicitly.
In the example below we commit offset after we finish handling the records in each partition.
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling
commitSync(offsets)
you should add one to the offset of the last message processed.
To use this mode, instead of subscribing to the topic using subscribe
, you just call
assign(Collection)
with the full list of partitions that you want to consume.
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));Once assigned, you can call
poll
in a loop, just as in the preceding examples to consume
records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions
will only change with another call to assign
. Manual partition assignment does
not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer
acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should
usually ensure that the groupId is unique for each consumer instance.
Note that it isn't possible to mix manual partition assignment (i.e. using assign
)
with dynamic partition assignment through topic subscription (i.e. using subscribe
).
Here are a couple of examples of this type of usage:
Each record comes with its own offset, so to manage your own offset you just need to do the following:
enable.auto.commit=false
ConsumerRecord
to save your position.
seek(TopicPartition, long)
.
This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
search index use case described above). If the partition assignment is done automatically special care is
needed to handle the case where partition assignments change. This can be done by providing a
ConsumerRebalanceListener
instance in the call to subscribe(Collection, ConsumerRebalanceListener)
and subscribe(Pattern, ConsumerRebalanceListener)
.
For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection)
. When partitions are assigned to a
consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer
to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection)
.
Another common use for ConsumerRebalanceListener
is to flush any caches the application maintains for
partitions that are moved elsewhere.
There are several instances where manually controlling the consumer's position can be useful.
One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not attempt to catch up processing all records, but rather just skip to the most recent records.
Another use case is for a system that maintains local state as described in the previous section. In such a system the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
Kafka allows specifying the position using seek(TopicPartition, long)
to specify the new position. Special
methods for seeking to the earliest and latest offset the server maintains are also available (
seekToBeginning(Collection)
and seekToEnd(Collection)
respectively).
One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.
Kafka supports dynamic controlling of consumption flows by using pause(Collection)
and resume(Collection)
to pause the consumption on the specified assigned partitions and resume the consumption
on the specified paused partitions respectively in the future poll(long)
calls.
Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
In order for this to work, consumers reading from these partitions should be configured to only read committed data.
This can be achieved by setting the isolation.level=read_committed
in the consumer's configuration.
In read_committed
mode, the consumer will read only those transactional messages which have been
successfully committed. It will continue to read non-transactional messages as before. There is no client-side
buffering in read_committed
mode. Instead, the end offset of a partition for a read_committed
consumer would be the offset of the first message in the partition belonging to an open transaction. This offset
is known as the 'Last Stable Offset'(LSO).
A read_committed
consumer will only read up to the LSO and filter out any transactional
messages which have been aborted. The LSO also affects the behavior of seekToEnd(Collection)
and
endOffsets(Collection)
for read_committed
consumers, details of which are in each method's documentation.
Finally, the fetch lag metrics are also adjusted to be relative to the LSO for read_committed
consumers.
Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from
topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction
markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using
read_committed
consumers may also see gaps due to aborted transactions, since those messages would not
be returned by the consumer and yet would have valid offsets.
ConcurrentModificationException
.
The only exception to this rule is wakeup()
, which can safely be used from an external thread to
interrupt an active operation. In this case, a WakeupException
will be
thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread.
The following snippet shows the typical pattern:
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true); consumer.wakeup();
Note that while it is possible to use thread interrupts instead of wakeup()
to abort a blocking operation
(in which case, InterruptException
will be raised), we discourage their use since they may cause a clean
shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using wakeup()
is impossible, e.g. when a consumer thread is managed by code that is unaware of the Kafka client.
We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
ConsumerRecords
instances to a blocking queue consumed by a pool of processor threads that actually handle
the record processing.
This option likewise has pros and cons:
Constructor | Description |
---|---|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs) |
A consumer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) |
A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value
Deserializer . |
KafkaConsumer(java.util.Properties properties) |
A consumer is instantiated by providing a
Properties object as configuration. |
KafkaConsumer(java.util.Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) |
A consumer is instantiated by providing a
Properties object as configuration, and a
key and a value Deserializer . |
Modifier and Type | Method | Description |
---|---|---|
void |
assign(java.util.Collection<TopicPartition> partitions) |
Manually assign a list of partitions to this consumer.
|
java.util.Set<TopicPartition> |
assignment() |
Get the set of partitions currently assigned to this consumer.
|
java.util.Map<TopicPartition,java.lang.Long> |
beginningOffsets(java.util.Collection<TopicPartition> partitions) |
Get the first offset for the given partitions.
|
void |
close() |
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
|
void |
close(long timeout,
java.util.concurrent.TimeUnit timeUnit) |
Tries to close the consumer cleanly within the specified timeout.
|
void |
commitAsync() |
Commit offsets returned on the last
poll() for all the subscribed list of topics and partition. |
void |
commitAsync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets,
OffsetCommitCallback callback) |
Commit the specified offsets for the specified list of topics and partitions to Kafka.
|
void |
commitAsync(OffsetCommitCallback callback) |
Commit offsets returned on the last
poll() for the subscribed list of topics and partitions. |
void |
commitSync() |
Commit offsets returned on the last
poll() for all the subscribed list of topics and partitions. |
void |
commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets) |
Commit the specified offsets for the specified list of topics and partitions.
|
OffsetAndMetadata |
committed(TopicPartition partition) |
Get the last committed offset for the given partition (whether the commit happened by this process or
another).
|
java.util.Map<TopicPartition,java.lang.Long> |
endOffsets(java.util.Collection<TopicPartition> partitions) |
Get the last offset for the given partitions.
|
java.util.Map<java.lang.String,java.util.List<PartitionInfo>> |
listTopics() |
Get metadata about partitions for all topics that the user is authorized to view.
|
java.util.Map<MetricName,? extends Metric> |
metrics() |
Get the metrics kept by the consumer
|
java.util.Map<TopicPartition,OffsetAndTimestamp> |
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch) |
Look up the offsets for the given partitions by timestamp.
|
java.util.List<PartitionInfo> |
partitionsFor(java.lang.String topic) |
Get metadata about the partitions for a given topic.
|
void |
pause(java.util.Collection<TopicPartition> partitions) |
Suspend fetching from the requested partitions.
|
java.util.Set<TopicPartition> |
paused() |
Get the set of partitions that were previously paused by a call to
pause(Collection) . |
ConsumerRecords<K,V> |
poll(long timeout) |
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
|
long |
position(TopicPartition partition) |
Get the offset of the next record that will be fetched (if a record with that offset exists).
|
void |
resume(java.util.Collection<TopicPartition> partitions) |
Resume specified partitions which have been paused with
pause(Collection) . |
void |
seek(TopicPartition partition,
long offset) |
Overrides the fetch offsets that the consumer will use on the next
poll(timeout) . |
void |
seekToBeginning(java.util.Collection<TopicPartition> partitions) |
Seek to the first offset for each of the given partitions.
|
void |
seekToEnd(java.util.Collection<TopicPartition> partitions) |
Seek to the last offset for each of the given partitions.
|
void |
subscribe(java.util.Collection<java.lang.String> topics) |
Subscribe to the given list of topics to get dynamically assigned partitions.
|
void |
subscribe(java.util.Collection<java.lang.String> topics,
ConsumerRebalanceListener listener) |
Subscribe to the given list of topics to get dynamically
assigned partitions.
|
void |
subscribe(java.util.regex.Pattern pattern) |
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
|
void |
subscribe(java.util.regex.Pattern pattern,
ConsumerRebalanceListener listener) |
Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
|
java.util.Set<java.lang.String> |
subscription() |
Get the current subscription.
|
void |
unsubscribe() |
Unsubscribe from topics currently subscribed with
subscribe(Collection) or subscribe(Pattern) . |
void |
wakeup() |
Wakeup the consumer.
|
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configspublic KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public KafkaConsumer(java.util.Properties properties)
Properties
object as configuration.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertiespublic KafkaConsumer(java.util.Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Properties
object as configuration, and a
key and a value Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertieskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public java.util.Set<TopicPartition> assignment()
assign(Collection)
then this will simply return the same partitions that
were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
process of getting reassigned).assignment
in interface Consumer<K,V>
assignment()
public java.util.Set<java.lang.String> subscription()
subscribe(Collection, ConsumerRebalanceListener)
, or an empty set if no such call has been made.subscription
in interface Consumer<K,V>
subscription()
public void subscribe(java.util.Collection<java.lang.String> topics, ConsumerRebalanceListener listener)
assign(Collection)
.
If the given list of topics is empty, it is treated the same as unsubscribe()
.
As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if any one of the following events are triggered:
When any of these events are triggered, the provided listener will be invoked first to indicate that
the consumer's assignment has been revoked, and then again when the new assignment has been received.
Note that rebalances will only occur during an active call to poll(long)
, so callbacks will
also only be invoked during that time.
The provided listener will immediately override any listener set in a previous call to subscribe.
It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics
subscribed in this call. See ConsumerRebalanceListener
for more details.
subscribe
in interface Consumer<K,V>
topics
- The list of topics to subscribe tolistener
- Non-null listener instance to get notifications on partition assignment/revocation for the
subscribed topicsjava.lang.IllegalArgumentException
- If topics is null or contains null or empty elements, or if listener is nulljava.lang.IllegalStateException
- If subscribe()
is called previously with pattern, or assign is called
previously (without a subsequent call to unsubscribe()
), or if not
configured at-least one partition assignment strategysubscribe(Collection, ConsumerRebalanceListener)
public void subscribe(java.util.Collection<java.lang.String> topics)
assign(Collection)
.
If the given list of topics is empty, it is treated the same as unsubscribe()
.
This is a short-hand for subscribe(Collection, ConsumerRebalanceListener)
, which
uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
subscribe(Collection, ConsumerRebalanceListener)
, since group rebalances will cause partition offsets
to be reset. You should also provide your own listener if you are doing your own offset
management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
subscribe
in interface Consumer<K,V>
topics
- The list of topics to subscribe tojava.lang.IllegalArgumentException
- If topics is null or contains null or empty elementsjava.lang.IllegalStateException
- If subscribe()
is called previously with pattern, or assign is called
previously (without a subsequent call to unsubscribe()
), or if not
configured at-least one partition assignment strategysubscribe(Collection)
public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener)
metadata.max.age.ms
configuration: by lowering
the max metadata age, the consumer will refresh metadata more often and check for matching topics.
See subscribe(Collection, ConsumerRebalanceListener)
for details on the
use of the ConsumerRebalanceListener
. Generally rebalances are triggered when there
is a change to the topics matching the provided pattern and when consumer group membership changes.
Group rebalances only take place during an active call to poll(long)
.
subscribe
in interface Consumer<K,V>
pattern
- Pattern to subscribe tolistener
- Non-null listener instance to get notifications on partition assignment/revocation for the
subscribed topicsjava.lang.IllegalArgumentException
- If pattern or listener is nulljava.lang.IllegalStateException
- If subscribe()
is called previously with topics, or assign is called
previously (without a subsequent call to unsubscribe()
), or if not
configured at-least one partition assignment strategysubscribe(Pattern, ConsumerRebalanceListener)
public void subscribe(java.util.regex.Pattern pattern)
This is a short-hand for subscribe(Pattern, ConsumerRebalanceListener)
, which
uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
subscribe(Pattern, ConsumerRebalanceListener)
, since group rebalances will cause partition offsets
to be reset. You should also provide your own listener if you are doing your own offset
management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
subscribe
in interface Consumer<K,V>
pattern
- Pattern to subscribe tojava.lang.IllegalArgumentException
- If pattern is nulljava.lang.IllegalStateException
- If subscribe()
is called previously with topics, or assign is called
previously (without a subsequent call to unsubscribe()
), or if not
configured at-least one partition assignment strategysubscribe(Pattern)
public void unsubscribe()
subscribe(Collection)
or subscribe(Pattern)
.
This also clears any partitions directly assigned through assign(Collection)
.unsubscribe
in interface Consumer<K,V>
unsubscribe()
public void assign(java.util.Collection<TopicPartition> partitions)
If the given list of topic partitions is empty, it is treated the same as unsubscribe()
.
Manual topic assignment through this method does not use the consumer's group management
functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
metadata change. Note that it is not possible to use both manual partition assignment with assign(Collection)
and group assignment with subscribe(Collection, ConsumerRebalanceListener)
.
If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new assignment replaces the old one.
assign
in interface Consumer<K,V>
partitions
- The list of partitions to assign this consumerjava.lang.IllegalArgumentException
- If partitions is null or contains null or empty topicsjava.lang.IllegalStateException
- If subscribe()
is called previously with topics or pattern
(without a subsequent call to unsubscribe()
)assign(Collection)
public ConsumerRecords<K,V> poll(long timeout)
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
consumed offset can be manually set through seek(TopicPartition, long)
or automatically set as the last committed
offset for the subscribed list of partitions
poll
in interface Consumer<K,V>
timeout
- The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
Must not be negative.InvalidOffsetException
- if the offset for a partition or set of
partitions is undefined or out of range and no offset reset policy has been configuredWakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if caller lacks Read access to any of the subscribed
topics or to the configured groupId. See the exception for more detailsKafkaException
- for any other unrecoverable errors (e.g. invalid groupId or
session timeout, errors deserializing key/value pairs, or any new error cases in future versions)java.lang.IllegalArgumentException
- if the timeout value is negativejava.lang.IllegalStateException
- if the consumer is not subscribed to any topics or manually assigned any
partitions to consume frompoll(long)
public void commitSync()
poll()
for all the subscribed list of topics and partitions.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).
Note that asynchronous offset commits sent previously with the commitAsync(OffsetCommitCallback)
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
commitSync
in interface Consumer<K,V>
CommitFailedException
- if the commit failed and cannot be retried.
This can only occur if you are using automatic group management with subscribe(Collection)
,
or if there is an active group with the same groupId which is using group management.WakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic or to the
configured groupId. See the exception for more detailsKafkaException
- for any other unrecoverable errors (e.g. if offset metadata
is too large or if the topic does not exist).commitSync()
public void commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)
This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).
Note that asynchronous offset commits sent previously with the commitAsync(OffsetCommitCallback)
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
commitSync
in interface Consumer<K,V>
offsets
- A map of offsets by partition with associated metadataCommitFailedException
- if the commit failed and cannot be retried.
This can only occur if you are using automatic group management with subscribe(Collection)
,
or if there is an active group with the same groupId which is using group management.WakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic or to the
configured groupId. See the exception for more detailsjava.lang.IllegalArgumentException
- if the committed offset is negativeKafkaException
- for any other unrecoverable errors (e.g. if offset metadata
is too large or if the topic does not exist).commitSync(Map)
public void commitAsync()
poll()
for all the subscribed list of topics and partition.
Same as commitAsync(null)
commitAsync
in interface Consumer<K,V>
commitAsync()
public void commitAsync(OffsetCommitCallback callback)
poll()
for the subscribed list of topics and partitions.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as
the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that
offsets committed through this API are guaranteed to complete before a subsequent call to commitSync()
(and variants) returns.
commitAsync
in interface Consumer<K,V>
callback
- Callback to invoke when the commit completescommitAsync(OffsetCommitCallback)
public void commitAsync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as
the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that
offsets committed through this API are guaranteed to complete before a subsequent call to commitSync()
(and variants) returns.
commitAsync
in interface Consumer<K,V>
offsets
- A map of offsets by partition with associate metadata. This map will be copied internally, so it
is safe to mutate the map after returning.callback
- Callback to invoke when the commit completescommitAsync(Map, OffsetCommitCallback)
public void seek(TopicPartition partition, long offset)
poll(timeout)
. If this API
is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsetsseek
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if the provided TopicPartition is not assigned to this consumer
or if provided offset is negativeseek(TopicPartition, long)
public void seekToBeginning(java.util.Collection<TopicPartition> partitions)
poll(long)
or position(TopicPartition)
are called.
If no partitions are provided, seek to the first offset for all of the currently assigned partitions.seekToBeginning
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if partitions
is null
or the provided TopicPartition is not assigned to this consumerseekToBeginning(Collection)
public void seekToEnd(java.util.Collection<TopicPartition> partitions)
poll(long)
or position(TopicPartition)
are called.
If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
If isolation.level=read_committed
, the end offset will be the Last Stable Offset, i.e., the offset
of the first message with an open transaction.
seekToEnd
in interface Consumer<K,V>
java.lang.IllegalArgumentException
- if partitions
is null
or the provided TopicPartition is not assigned to this consumerseekToEnd(Collection)
public long position(TopicPartition partition)
This call will block until either the position could be determined or an unrecoverable error is encountered (in which case it is thrown to the caller).
position
in interface Consumer<K,V>
partition
- The partition to get the position forjava.lang.IllegalArgumentException
- if the provided TopicPartition is not assigned to this consumerInvalidOffsetException
- if no offset is currently defined for
the partitionWakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic or to the
configured groupId. See the exception for more detailsKafkaException
- for any other unrecoverable errorsposition(TopicPartition)
public OffsetAndMetadata committed(TopicPartition partition)
This call will block to do a remote call to get the latest committed offsets from the server.
committed
in interface Consumer<K,V>
partition
- The partition to checkWakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic or to the
configured groupId. See the exception for more detailsKafkaException
- for any other unrecoverable errorscommitted(TopicPartition)
public java.util.Map<MetricName,? extends Metric> metrics()
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
partitionsFor
in interface Consumer<K,V>
topic
- The topic to get partition metadata forWakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledAuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the specified topic. See the exception for more detailsTimeoutException
- if the topic metadata could not be fetched before
expiration of the configured request timeoutKafkaException
- for any other unrecoverable errorspartitionsFor(String)
public java.util.Map<java.lang.String,java.util.List<PartitionInfo>> listTopics()
listTopics
in interface Consumer<K,V>
WakeupException
- if wakeup()
is called before or while this
function is calledInterruptException
- if the calling thread is interrupted before or while
this function is calledTimeoutException
- if the topic metadata could not be fetched before
expiration of the configured request timeoutKafkaException
- for any other unrecoverable errorslistTopics()
public void pause(java.util.Collection<TopicPartition> partitions)
poll(long)
will not return
any records from these partitions until they have been resumed using resume(Collection)
.
Note that this method does not affect partition subscription. In particular, it does not cause a group
rebalance when automatic assignment is used.pause
in interface Consumer<K,V>
partitions
- The partitions which should be pausedjava.lang.IllegalStateException
- if one of the provided partitions is not assigned to this consumerpause(Collection)
public void resume(java.util.Collection<TopicPartition> partitions)
pause(Collection)
. New calls to
poll(long)
will return records from these partitions if there are any to be fetched.
If the partitions were not previously paused, this method is a no-op.resume
in interface Consumer<K,V>
partitions
- The partitions which should be resumedjava.lang.IllegalStateException
- if one of the provided partitions is not assigned to this consumerresume(Collection)
public java.util.Set<TopicPartition> paused()
pause(Collection)
.public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
offsetsForTimes
in interface Consumer<K,V>
timestampsToSearch
- the mapping from partition to the timestamp to look up.null
will be returned for the partition if there is no
such message.AuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic(s). See the exception for more detailsjava.lang.IllegalArgumentException
- if the target timestamp is negativeTimeoutException
- if the offset metadata could not be fetched before
expiration of the configured request.timeout.ms
UnsupportedVersionException
- if the broker does not support looking up
the offsets by timestampoffsetsForTimes(java.util.Map)
public java.util.Map<TopicPartition,java.lang.Long> beginningOffsets(java.util.Collection<TopicPartition> partitions)
This method does not change the current consumer position of the partitions.
beginningOffsets
in interface Consumer<K,V>
partitions
- the partitions to get the earliest offsets.AuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic(s). See the exception for more detailsTimeoutException
- if the offsets could not be fetched before
expiration of the configured request.timeout.ms
seekToBeginning(Collection)
public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions)
This method does not change the current consumer position of the partitions.
When isolation.level=read_committed
the last offset will be the Last Stable Offset (LSO).
This is the offset of the first message with an open transaction. The LSO moves forward as transactions
are completed.
endOffsets
in interface Consumer<K,V>
partitions
- the partitions to get the end offsets.AuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if not authorized to the topic(s). See the exception for more detailsTimeoutException
- if the offsets could not be fetched before
expiration of the configured request.timeout.ms
seekToEnd(Collection)
public void close()
close(long, TimeUnit)
for details. Note that wakeup()
cannot be used to interrupt close.close
in interface java.lang.AutoCloseable
close
in interface java.io.Closeable
close
in interface Consumer<K,V>
InterruptException
- if the calling thread is interrupted
before or while this function is calledKafkaException
- for any other error during closeclose()
public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)
timeout
for the consumer to complete pending commits and leave the group.
If auto-commit is enabled, this will commit the current offsets if possible within the
timeout. If the consumer is unable to complete offset commits and gracefully leave the group
before the timeout expires, the consumer is force closed. Note that wakeup()
cannot be
used to interrupt close.close
in interface Consumer<K,V>
timeout
- The maximum time to wait for consumer to close gracefully. The value must be
non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.timeUnit
- The time unit for the timeout
java.lang.IllegalArgumentException
- If the timeout
is negative.InterruptException
- If the thread is interrupted before or while this function is calledKafkaException
- for any other error during closeclose(long, TimeUnit)
public void wakeup()
WakeupException
.
If no thread is blocking in a method which can throw WakeupException
, the next call to such a method will raise it instead.