Class KafkaShareConsumer<K,V>
- All Implemented Interfaces:
Closeable
,AutoCloseable
,ShareConsumer<K,
V>
This is an early access feature under development which is introduced by KIP-932. It is not suitable for production use until it is fully implemented and released.
Cross-Version Compatibility
This client can communicate with brokers that are a version that supports share groups. You will receive anUnsupportedVersionException
when invoking an API that is not
available on the running broker version.
Share Groups and Topic Subscriptions
Kafka uses the concept of share groups to allow a pool of consumers to cooperate on the work of consuming and processing records. All consumer instances sharing the samegroup.id
will be part of
the same share group.
Each consumer in a group can dynamically set the list of topics it wants to subscribe to using the
subscribe(Collection)
method. Kafka will deliver each message in the subscribed topics to one
consumer in the share group. Unlike consumer groups, share groups balance the partitions between all
members of the share group permitting multiple consumers to consume from the same partitions. This gives
more flexible sharing of records than a consumer group, at the expense of record ordering.
Membership in a share group is maintained dynamically: if a consumer fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, the partition assignment is re-evaluated and partitions can be moved from existing consumers to the new one. This is known as rebalancing the group and is discussed in more detail below. Group rebalancing is also used when new partitions are added to one of the subscribed topics. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to the members of the group.
Conceptually, you can think of a share group as a single logical subscriber made up of multiple consumers. In fact, in other messaging systems, a share group is roughly equivalent to a durable shared subscription. You can have multiple share groups and consumer groups independently consuming from the same topics.
Detecting Consumer Failures
After subscribing to a set of topics, the consumer will automatically join the group whenpoll(Duration)
is
invoked. This method is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
will stay in the group and continue to receive records from the partitions it was assigned. Under the covers,
the consumer sends periodic heartbeats to the broker. If the consumer crashes or is unable to send heartbeats for
the duration of the share group's session time-out, then the consumer will be considered dead and its partitions
will be reassigned.
It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats
in the background, but no progress is being made. To prevent the consumer from holding onto its partitions
indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms
setting.
If you don't call poll at least as frequently as this, the client will proactively leave the share group.
So to stay in the group, you must continue to call poll.
Record Delivery and Acknowledgement
When a consumer in a share-group fetches records usingpoll(Duration)
, it receives available records from any
of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
the lock duration is 30 seconds, but it can also be controlled using the group group.share.record.lock.duration.ms
configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and
then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
the following ways:
- The consumer can acknowledge successful processing of the record
- The consumer can release the record, which makes the record available for another delivery attempt
- The consumer can reject the record, which indicates that the record is unprocessable and does not make the record available for another delivery attempt
- The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed
group.share.record.lock.partition.limit
. By limiting the duration of the acquisition lock and automatically
releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
The consumer can choose to use implicit or explicit acknowledgement of the records it processes.
If the application calls acknowledge(ConsumerRecord, AcknowledgeType)
for any record in the batch,
it is using explicit acknowledgement. In this case:
- The application calls
commitSync()
orcommitAsync()
which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll. - The application calls
poll(Duration)
without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll. - The application calls
close()
which attempts to commit any pending acknowledgements and releases any remaining acquired records.
acknowledge(ConsumerRecord, AcknowledgeType)
for any record in the batch,
it is using implicit acknowledgement. In this case:
- The application calls
commitSync()
orcommitAsync()
which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka. - The application calls
poll(Duration)
without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements. - The application calls
close()
which releases any acquired records without acknowledgement.
The consumer guarantees that the records returned in the ConsumerRecords
object for a specific topic-partition
are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records
in a batch are performed atomically. This makes error handling significantly more straightforward because there can be
one error code per partition.
Usage Examples
The share consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to demonstrate how to use them.Acknowledging a batch of records (implicit acknowledgement)
This example demonstrates implicit acknowledgement usingpoll(Duration)
to acknowledge the records which
were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and
acknowledged synchronously with Kafka as the consumer fetches more records.
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); doProcessing(record); } }Alternatively, you can use
commitSync()
or commitAsync()
to commit the acknowledgements, but this is
slightly less efficient because there is an additional request sent to Kafka.
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); doProcessing(record); } consumer.commitSync(); }
Per-record acknowledgement (explicit acknowledgement)
This example demonstrates using different acknowledgement types depending on the outcome of processing the records.Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { doProcessing(record); consumer.acknowledge(record, AcknowledgeType.ACCEPT); } catch (Exception e) { consumer.acknowledge(record, AcknowledgeType.REJECT); } } consumer.commitSync(); }Each record processed is separately acknowledged using a call to
acknowledge(ConsumerRecord, AcknowledgeType)
.
The AcknowledgeType
argument indicates whether the record was processed successfully or not. In this case,
the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error
such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing
attempt, AcknowledgeType.RELEASE
is more appropriate because the record remains eligible for further delivery attempts.
The calls to acknowledge(ConsumerRecord, AcknowledgeType)
are simply updating local information in the consumer.
It is only once commitSync()
is called that the acknowledgements are committed by sending the new state
information to Kafka.
Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)
This example demonstrates ending processing of a batch of records on the first error.Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { doProcessing(record); consumer.acknowledge(record, AcknowledgeType.ACCEPT); } catch (Exception e) { consumer.acknowledge(record, AcknowledgeType.REJECT); break; } } consumer.commitSync(); }There are the following cases in this example:
- The batch contains no records, in which case the application just polls again. The call to
commitSync()
just does nothing because the batch was empty. - All of the records in the batch are processed successfully. The calls to
acknowledge(ConsumerRecord, AcknowledgeType)
specifyingAcknowledgeType.ACCEPT
mark all records in the batch as successfully processed. - One of the records encounters an exception. The call to
acknowledge(ConsumerRecord, AcknowledgeType)
specifyingAcknowledgeType.REJECT
rejects that record. Earlier records in the batch have already been marked as successfully processed. The call tocommitSync()
commits the acknowledgements, but the records after the failed record remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.
Reading Transactional Records
The way that share groups handle transactional records is controlled by thegroup.share.isolation.level
configuration property. In a share group, the isolation level applies to the entire share group, not just individual
consumers.
In read_uncommitted
isolation level, the share group consumes all non-transactional and transactional
records. The consumption is bounded by the high-water mark.
In read_committed
isolation level (not yet supported), the share group only consumes non-transactional
records and committed transactional records. The set of records which are eligible to become in-flight records are
non-transactional records and committed transactional records only. The consumption is bounded by the last stable
offset, so an open transaction blocks the progress of the share group with read_committed isolation level.
Multithreaded Processing
The consumer is NOT thread-safe. It is the responsibility of the user to ensure that multithreaded access is properly synchronized. Unsynchronized access will result inConcurrentModificationException
.
The only exception to this rule is wakeup()
which can safely be used from an external thread to
interrupt an active operation. In this case, a WakeupException
will be
thrown from the thread blocking on the operation. This can be used to shut down the consumer from another thread.
The following snippet shows the typical pattern:
public class KafkaShareConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaShareConsumer consumer; public KafkaShareConsumerRunner(KafkaShareConsumer consumer) { this.consumer = consumer; } @Override public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true); consumer.wakeup();
Note that while it is possible to use thread interrupts instead of wakeup()
to abort a blocking operation
(in which case, InterruptException
will be raised), we discourage their use since they may cause a clean
shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using wakeup()
is impossible, such as when a consumer thread is managed by code that is unaware of the Kafka client.
We have intentionally avoided implementing a particular threading model for processing. Various options for multithreaded processing are possible, of which the most straightforward is to dedicate a thread to each consumer.
-
Constructor Summary
ConstructorDescriptionKafkaShareConsumer
(Map<String, Object> configs) A consumer is instantiated by providing a set of key-value pairs as configuration.KafkaShareConsumer
(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a valueDeserializer
.KafkaShareConsumer
(Properties properties) A consumer is instantiated by providing aProperties
object as configuration.KafkaShareConsumer
(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) A consumer is instantiated by providing aProperties
object as configuration, and a key and a valueDeserializer
.KafkaShareConsumer
(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) -
Method Summary
Modifier and TypeMethodDescriptionvoid
acknowledge
(ConsumerRecord<K, V> record) Acknowledge successful delivery of a record returned on the lastpoll(Duration)
call.void
acknowledge
(ConsumerRecord<K, V> record, AcknowledgeType type) Acknowledge delivery of a record returned on the lastpoll(Duration)
call indicating whether it was processed successfully.clientInstanceId
(Duration timeout) Determines the client's unique client instance ID used for telemetry.void
close()
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.void
Tries to close the consumer cleanly within the specified timeout.void
Commit the acknowledgements for the records returned.Commit the acknowledgements for the records returned.commitSync
(Duration timeout) Commit the acknowledgements for the records returned.Map<MetricName,
? extends Metric> metrics()
Get the metrics kept by the consumerFetch data for the topics specified usingsubscribe(Collection)
.void
Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.void
subscribe
(Collection<String> topics) Subscribe to the given list of topics to get dynamically assigned partitions.Get the current subscription.void
Unsubscribe from topics currently subscribed withsubscribe(Collection)
.void
wakeup()
Wake up the consumer.
-
Constructor Details
-
KafkaShareConsumer
A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings are documented here. Values can be either strings or objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).Valid configuration strings are documented at
ConsumerConfig
.Note: after creating a
KafkaShareConsumer
you must alwaysclose()
it to avoid resource leaks.- Parameters:
configs
- The consumer configs
-
KafkaShareConsumer
A consumer is instantiated by providing aProperties
object as configuration.Valid configuration strings are documented at
ConsumerConfig
.Note: after creating a
KafkaShareConsumer
you must alwaysclose()
it to avoid resource leaks.- Parameters:
properties
- The consumer configuration properties
-
KafkaShareConsumer
public KafkaShareConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) A consumer is instantiated by providing aProperties
object as configuration, and a key and a valueDeserializer
.Valid configuration strings are documented at
ConsumerConfig
.Note: after creating a
KafkaShareConsumer
you must alwaysclose()
it to avoid resource leaks.- Parameters:
properties
- The consumer configuration propertieskeyDeserializer
- The deserializer for key that implementsDeserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implementsDeserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.
-
KafkaShareConsumer
public KafkaShareConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a valueDeserializer
.Valid configuration strings are documented at
ConsumerConfig
.Note: after creating a
KafkaShareConsumer
you must alwaysclose()
it to avoid resource leaks.- Parameters:
configs
- The consumer configskeyDeserializer
- The deserializer for key that implementsDeserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implementsDeserializer
. The configure() method won't be called in the consumer when the deserializer is passed in directly.
-
KafkaShareConsumer
public KafkaShareConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
-
-
Method Details
-
subscription
Get the current subscription. Will return the same topics used in the most recent call tosubscribe(Collection)
, or an empty set if no such call has been made.- Specified by:
subscription
in interfaceShareConsumer<K,
V> - Returns:
- The set of topics currently subscribed to
- See Also:
-
subscribe
Subscribe to the given list of topics to get dynamically assigned partitions. Topic subscriptions are not incremental. This list will replace the current assignment, if there is one. If the given list of topics is empty, it is treated the same asunsubscribe()
.As part of group management, the coordinator will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if any one of the following events are triggered:
- A member joins or leaves the share group
- An existing member of the share group is shut down or fails
- The number of partitions changes for any of the subscribed topics
- A subscribed topic is created or deleted
- Specified by:
subscribe
in interfaceShareConsumer<K,
V> - Parameters:
topics
- The list of topics to subscribe to- Throws:
IllegalArgumentException
- if topics is null or contains null or empty elementsKafkaException
- for any other unrecoverable errors- See Also:
-
unsubscribe
public void unsubscribe()Unsubscribe from topics currently subscribed withsubscribe(Collection)
.- Specified by:
unsubscribe
in interfaceShareConsumer<K,
V> - Throws:
KafkaException
- for any other unrecoverable errors- See Also:
-
poll
Fetch data for the topics specified usingsubscribe(Collection)
. It is an error to not have subscribed to any topics before polling for data.This method returns immediately if there are records available. Otherwise, it will await the passed timeout. If the timeout expires, an empty record set will be returned.
- Specified by:
poll
in interfaceShareConsumer<K,
V> - Parameters:
timeout
- The maximum time to block (must not be greater thanLong.MAX_VALUE
milliseconds)- Returns:
- map of topic to records since the last fetch for the subscribed list of topics
- Throws:
AuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if caller lacks Read access to any of the subscribed topics or to the share group. See the exception for more detailsIllegalArgumentException
- if the timeout value is negativeIllegalStateException
- if the consumer is not subscribed to any topicsArithmeticException
- if the timeout is greater thanLong.MAX_VALUE
milliseconds.InvalidTopicException
- if the current subscription contains any invalid topic (perTopic.validate(String)
)WakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the calling thread is interrupted before or while this method is calledKafkaException
- for any other unrecoverable errors- See Also:
-
acknowledge
Acknowledge successful delivery of a record returned on the lastpoll(Duration)
call. The acknowledgement is committed on the nextcommitSync()
,commitAsync()
orpoll(Duration)
call.- Specified by:
acknowledge
in interfaceShareConsumer<K,
V> - Parameters:
record
- The record to acknowledge- Throws:
IllegalStateException
- if the record is not waiting to be acknowledged, or the consumer has already used implicit acknowledgement- See Also:
-
acknowledge
Acknowledge delivery of a record returned on the lastpoll(Duration)
call indicating whether it was processed successfully. The acknowledgement is committed on the nextcommitSync()
,commitAsync()
orpoll(Duration)
call. By using this method, the consumer is using explicit acknowledgement.- Specified by:
acknowledge
in interfaceShareConsumer<K,
V> - Parameters:
record
- The record to acknowledgetype
- The acknowledgement type which indicates whether it was processed successfully- Throws:
IllegalStateException
- if the record is not waiting to be acknowledged, or the consumer has already used implicit acknowledgement- See Also:
-
commitSync
Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated usingacknowledge(ConsumerRecord)
oracknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement, all the records returned by the latest call topoll(Duration)
are acknowledged.This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is encountered (in which case it is thrown to the caller), or the timeout specified by
default.api.timeout.ms
expires.- Specified by:
commitSync
in interfaceShareConsumer<K,
V> - Returns:
- A map of the results for each topic-partition for which delivery was acknowledged. If the acknowledgement failed for a topic-partition, an exception is present.
- Throws:
WakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- for any other unrecoverable errors- See Also:
-
commitSync
Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated usingacknowledge(ConsumerRecord)
oracknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement, all the records returned by the latest call topoll(Duration)
are acknowledged.This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is encountered (in which case it is thrown to the caller), or the timeout expires.
- Specified by:
commitSync
in interfaceShareConsumer<K,
V> - Parameters:
timeout
- The maximum amount of time to await completion of the acknowledgement- Returns:
- A map of the results for each topic-partition for which delivery was acknowledged. If the acknowledgement failed for a topic-partition, an exception is present.
- Throws:
IllegalArgumentException
- if thetimeout
is negativeWakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- for any other unrecoverable errors- See Also:
-
commitAsync
public void commitAsync()Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated usingacknowledge(ConsumerRecord)
oracknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement, all the records returned by the latest call topoll(Duration)
are acknowledged.- Specified by:
commitAsync
in interfaceShareConsumer<K,
V> - Throws:
KafkaException
- for any other unrecoverable errors- See Also:
-
setAcknowledgementCommitCallback
Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.- Specified by:
setAcknowledgementCommitCallback
in interfaceShareConsumer<K,
V> - Parameters:
callback
- The acknowledgement commit callback- See Also:
-
clientInstanceId
Determines the client's unique client instance ID used for telemetry. This ID is unique to this specific client instance and will not change after it is initially generated. The ID is useful for correlating client operations with telemetry sent to the broker and to its eventual monitoring destinations.If telemetry is enabled, this will first require a connection to the cluster to generate the unique client instance ID. This method waits up to
timeout
for the consumer client to complete the request.Client telemetry is controlled by the
ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG
configuration option.- Specified by:
clientInstanceId
in interfaceShareConsumer<K,
V> - Parameters:
timeout
- The maximum time to wait for consumer client to determine its client instance ID. The value must be non-negative. Specifying a timeout of zero means do not wait for the initial request to complete if it hasn't already.- Returns:
- The client's assigned instance id used for metrics collection.
- Throws:
IllegalArgumentException
- if thetimeout
is negativeIllegalStateException
- if telemetry is not enabledWakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- if an unexpected error occurs while trying to determine the client instance ID, though this error does not necessarily imply the consumer client is otherwise unusable
-
metrics
Get the metrics kept by the consumer- Specified by:
metrics
in interfaceShareConsumer<K,
V> - See Also:
-
close
public void close()Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. This will commit acknowledgements if possible within the default timeout. Seeclose(Duration)
for details. Note thatwakeup()
cannot be used to interrupt close.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Specified by:
close
in interfaceShareConsumer<K,
V> - Throws:
WakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted before or while this method is calledKafkaException
- for any other error during close- See Also:
-
close
Tries to close the consumer cleanly within the specified timeout. This method waits up totimeout
for the consumer to complete acknowledgements and leave the group. If the consumer is unable to complete acknowledgements and gracefully leave the group before the timeout expires, the consumer is force closed. Note thatwakeup()
cannot be used to interrupt close.- Specified by:
close
in interfaceShareConsumer<K,
V> - Parameters:
timeout
- The maximum time to wait for consumer to close gracefully. The value must be non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.- Throws:
IllegalArgumentException
- if thetimeout
is negativeWakeupException
- ifwakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted before or while this method is calledKafkaException
- for any other error during close- See Also:
-
wakeup
public void wakeup()Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll. The thread which is blocking in an operation will throwWakeupException
. If no thread is blocking in a method which can throwWakeupException
, the next call to such a method will raise it instead.- Specified by:
wakeup
in interfaceShareConsumer<K,
V> - See Also:
-