Class KafkaShareConsumer<K,V>

java.lang.Object
org.apache.kafka.clients.consumer.KafkaShareConsumer<K,V>
All Implemented Interfaces:
Closeable, AutoCloseable, ShareConsumer<K,V>

@Evolving public class KafkaShareConsumer<K,V> extends Object implements ShareConsumer<K,V>
A client that consumes records from a Kafka cluster using a share group.

This is an early access feature under development which is introduced by KIP-932. It is not suitable for production use until it is fully implemented and released.

Cross-Version Compatibility

This client can communicate with brokers that are a version that supports share groups. You will receive an UnsupportedVersionException when invoking an API that is not available on the running broker version.

Share Groups and Topic Subscriptions

Kafka uses the concept of share groups to allow a pool of consumers to cooperate on the work of consuming and processing records. All consumer instances sharing the same group.id will be part of the same share group.

Each consumer in a group can dynamically set the list of topics it wants to subscribe to using the subscribe(Collection) method. Kafka will deliver each message in the subscribed topics to one consumer in the share group. Unlike consumer groups, share groups balance the partitions between all members of the share group permitting multiple consumers to consume from the same partitions. This gives more flexible sharing of records than a consumer group, at the expense of record ordering.

Membership in a share group is maintained dynamically: if a consumer fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, the partition assignment is re-evaluated and partitions can be moved from existing consumers to the new one. This is known as rebalancing the group and is discussed in more detail below. Group rebalancing is also used when new partitions are added to one of the subscribed topics. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to the members of the group.

Conceptually, you can think of a share group as a single logical subscriber made up of multiple consumers. In fact, in other messaging systems, a share group is roughly equivalent to a durable shared subscription. You can have multiple share groups and consumer groups independently consuming from the same topics.

Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(Duration) is invoked. This method is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive records from the partitions it was assigned. Under the covers, the consumer sends periodic heartbeats to the broker. If the consumer crashes or is unable to send heartbeats for the duration of the share group's session time-out, then the consumer will be considered dead and its partitions will be reassigned.

It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats in the background, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. If you don't call poll at least as frequently as this, the client will proactively leave the share group. So to stay in the group, you must continue to call poll.

Record Delivery and Acknowledgement

When a consumer in a share-group fetches records using poll(Duration), it receives available records from any of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, the lock duration is 30 seconds, but it can also be controlled using the group group.share.record.lock.duration.ms configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in the following ways:
  • The consumer can acknowledge successful processing of the record
  • The consumer can release the record, which makes the record available for another delivery attempt
  • The consumer can reject the record, which indicates that the record is unprocessable and does not make the record available for another delivery attempt
  • The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed
The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.record.lock.partition.limit. By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

The consumer can choose to use implicit or explicit acknowledgement of the records it processes.

If the application calls acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using explicit acknowledgement. In this case:

  • The application calls commitSync() or commitAsync() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.
  • The application calls poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.
  • The application calls close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.
If the application does not call acknowledge(ConsumerRecord, AcknowledgeType) for any record in the batch, it is using implicit acknowledgement. In this case:
  • The application calls commitSync() or commitAsync() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.
  • The application calls poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.
  • The application calls close() which releases any acquired records without acknowledgement.

The consumer guarantees that the records returned in the ConsumerRecords object for a specific topic-partition are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per partition.

Usage Examples

The share consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to demonstrate how to use them.

Acknowledging a batch of records (implicit acknowledgement)

This example demonstrates implicit acknowledgement using poll(Duration) to acknowledge the records which were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and acknowledged synchronously with Kafka as the consumer fetches more records.
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
             doProcessing(record);
         }
     }
 
Alternatively, you can use commitSync() or commitAsync() to commit the acknowledgements, but this is slightly less efficient because there is an additional request sent to Kafka.
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
             doProcessing(record);
         }
         consumer.commitSync();
     }
 

Per-record acknowledgement (explicit acknowledgement)

This example demonstrates using different acknowledgement types depending on the outcome of processing the records.
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             try {
                 doProcessing(record);
                 consumer.acknowledge(record, AcknowledgeType.ACCEPT);
             } catch (Exception e) {
                 consumer.acknowledge(record, AcknowledgeType.REJECT);
             }
         }
         consumer.commitSync();
     }
 
Each record processed is separately acknowledged using a call to acknowledge(ConsumerRecord, AcknowledgeType). The AcknowledgeType argument indicates whether the record was processed successfully or not. In this case, the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing attempt, AcknowledgeType.RELEASE is more appropriate because the record remains eligible for further delivery attempts.

The calls to acknowledge(ConsumerRecord, AcknowledgeType) are simply updating local information in the consumer. It is only once commitSync() is called that the acknowledgements are committed by sending the new state information to Kafka.

Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)

This example demonstrates ending processing of a batch of records on the first error.
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             try {
                 doProcessing(record);
                 consumer.acknowledge(record, AcknowledgeType.ACCEPT);
             } catch (Exception e) {
                 consumer.acknowledge(record, AcknowledgeType.REJECT);
                 break;
             }
         }
         consumer.commitSync();
     }
 
There are the following cases in this example:
  1. The batch contains no records, in which case the application just polls again. The call to commitSync() just does nothing because the batch was empty.
  2. All of the records in the batch are processed successfully. The calls to acknowledge(ConsumerRecord, AcknowledgeType) specifying AcknowledgeType.ACCEPT mark all records in the batch as successfully processed.
  3. One of the records encounters an exception. The call to acknowledge(ConsumerRecord, AcknowledgeType) specifying AcknowledgeType.REJECT rejects that record. Earlier records in the batch have already been marked as successfully processed. The call to commitSync() commits the acknowledgements, but the records after the failed record remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.

Reading Transactional Records

The way that share groups handle transactional records is controlled by the group.share.isolation.level configuration property. In a share group, the isolation level applies to the entire share group, not just individual consumers.

In read_uncommitted isolation level, the share group consumes all non-transactional and transactional records. The consumption is bounded by the high-water mark.

In read_committed isolation level (not yet supported), the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The consumption is bounded by the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level.

Multithreaded Processing

The consumer is NOT thread-safe. It is the responsibility of the user to ensure that multithreaded access is properly synchronized. Unsynchronized access will result in ConcurrentModificationException.

The only exception to this rule is wakeup() which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException will be thrown from the thread blocking on the operation. This can be used to shut down the consumer from another thread. The following snippet shows the typical pattern:

 public class KafkaShareConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaShareConsumer consumer;

     public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
       this.consumer = consumer;
     }

     @Override
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }
 
Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
     closed.set(true);
     consumer.wakeup();
 

Note that while it is possible to use thread interrupts instead of wakeup() to abort a blocking operation (in which case, InterruptException will be raised), we discourage their use since they may cause a clean shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using wakeup() is impossible, such as when a consumer thread is managed by code that is unaware of the Kafka client.

We have intentionally avoided implementing a particular threading model for processing. Various options for multithreaded processing are possible, of which the most straightforward is to dedicate a thread to each consumer.

  • Constructor Details

    • KafkaShareConsumer

      public KafkaShareConsumer(Map<String,Object> configs)
      A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings are documented here. Values can be either strings or objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).

      Valid configuration strings are documented at ConsumerConfig.

      Note: after creating a KafkaShareConsumer you must always close() it to avoid resource leaks.

      Parameters:
      configs - The consumer configs
    • KafkaShareConsumer

      public KafkaShareConsumer(Properties properties)
      A consumer is instantiated by providing a Properties object as configuration.

      Valid configuration strings are documented at ConsumerConfig.

      Note: after creating a KafkaShareConsumer you must always close() it to avoid resource leaks.

      Parameters:
      properties - The consumer configuration properties
    • KafkaShareConsumer

      public KafkaShareConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
      A consumer is instantiated by providing a Properties object as configuration, and a key and a value Deserializer.

      Valid configuration strings are documented at ConsumerConfig.

      Note: after creating a KafkaShareConsumer you must always close() it to avoid resource leaks.

      Parameters:
      properties - The consumer configuration properties
      keyDeserializer - The deserializer for key that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
      valueDeserializer - The deserializer for value that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
    • KafkaShareConsumer

      public KafkaShareConsumer(Map<String,Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
      A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value Deserializer.

      Valid configuration strings are documented at ConsumerConfig.

      Note: after creating a KafkaShareConsumer you must always close() it to avoid resource leaks.

      Parameters:
      configs - The consumer configs
      keyDeserializer - The deserializer for key that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
      valueDeserializer - The deserializer for value that implements Deserializer. The configure() method won't be called in the consumer when the deserializer is passed in directly.
    • KafkaShareConsumer

      public KafkaShareConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
  • Method Details

    • subscription

      public Set<String> subscription()
      Get the current subscription. Will return the same topics used in the most recent call to subscribe(Collection), or an empty set if no such call has been made.
      Specified by:
      subscription in interface ShareConsumer<K,V>
      Returns:
      The set of topics currently subscribed to
      See Also:
    • subscribe

      public void subscribe(Collection<String> topics)
      Subscribe to the given list of topics to get dynamically assigned partitions. Topic subscriptions are not incremental. This list will replace the current assignment, if there is one. If the given list of topics is empty, it is treated the same as unsubscribe().

      As part of group management, the coordinator will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if any one of the following events are triggered:

      • A member joins or leaves the share group
      • An existing member of the share group is shut down or fails
      • The number of partitions changes for any of the subscribed topics
      • A subscribed topic is created or deleted
      Specified by:
      subscribe in interface ShareConsumer<K,V>
      Parameters:
      topics - The list of topics to subscribe to
      Throws:
      IllegalArgumentException - if topics is null or contains null or empty elements
      KafkaException - for any other unrecoverable errors
      See Also:
    • unsubscribe

      public void unsubscribe()
      Unsubscribe from topics currently subscribed with subscribe(Collection).
      Specified by:
      unsubscribe in interface ShareConsumer<K,V>
      Throws:
      KafkaException - for any other unrecoverable errors
      See Also:
    • poll

      public ConsumerRecords<K,V> poll(Duration timeout)
      Fetch data for the topics specified using subscribe(Collection). It is an error to not have subscribed to any topics before polling for data.

      This method returns immediately if there are records available. Otherwise, it will await the passed timeout. If the timeout expires, an empty record set will be returned.

      Specified by:
      poll in interface ShareConsumer<K,V>
      Parameters:
      timeout - The maximum time to block (must not be greater than Long.MAX_VALUE milliseconds)
      Returns:
      map of topic to records since the last fetch for the subscribed list of topics
      Throws:
      AuthenticationException - if authentication fails. See the exception for more details
      AuthorizationException - if caller lacks Read access to any of the subscribed topics or to the share group. See the exception for more details
      IllegalArgumentException - if the timeout value is negative
      IllegalStateException - if the consumer is not subscribed to any topics
      ArithmeticException - if the timeout is greater than Long.MAX_VALUE milliseconds.
      InvalidTopicException - if the current subscription contains any invalid topic (per Topic.validate(String))
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the calling thread is interrupted before or while this method is called
      KafkaException - for any other unrecoverable errors
      See Also:
    • acknowledge

      public void acknowledge(ConsumerRecord<K,V> record)
      Acknowledge successful delivery of a record returned on the last poll(Duration) call. The acknowledgement is committed on the next commitSync(), commitAsync() or poll(Duration) call.
      Specified by:
      acknowledge in interface ShareConsumer<K,V>
      Parameters:
      record - The record to acknowledge
      Throws:
      IllegalStateException - if the record is not waiting to be acknowledged, or the consumer has already used implicit acknowledgement
      See Also:
    • acknowledge

      public void acknowledge(ConsumerRecord<K,V> record, AcknowledgeType type)
      Acknowledge delivery of a record returned on the last poll(Duration) call indicating whether it was processed successfully. The acknowledgement is committed on the next commitSync(), commitAsync() or poll(Duration) call. By using this method, the consumer is using explicit acknowledgement.
      Specified by:
      acknowledge in interface ShareConsumer<K,V>
      Parameters:
      record - The record to acknowledge
      type - The acknowledgement type which indicates whether it was processed successfully
      Throws:
      IllegalStateException - if the record is not waiting to be acknowledged, or the consumer has already used implicit acknowledgement
      See Also:
    • commitSync

      Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated using acknowledge(ConsumerRecord) or acknowledge(ConsumerRecord, AcknowledgeType). If the consumer is using implicit acknowledgement, all the records returned by the latest call to poll(Duration) are acknowledged.

      This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is encountered (in which case it is thrown to the caller), or the timeout specified by default.api.timeout.ms expires.

      Specified by:
      commitSync in interface ShareConsumer<K,V>
      Returns:
      A map of the results for each topic-partition for which delivery was acknowledged. If the acknowledgement failed for a topic-partition, an exception is present.
      Throws:
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the thread is interrupted while blocked
      KafkaException - for any other unrecoverable errors
      See Also:
    • commitSync

      public Map<TopicIdPartition,Optional<KafkaException>> commitSync(Duration timeout)
      Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated using acknowledge(ConsumerRecord) or acknowledge(ConsumerRecord, AcknowledgeType). If the consumer is using implicit acknowledgement, all the records returned by the latest call to poll(Duration) are acknowledged.

      This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is encountered (in which case it is thrown to the caller), or the timeout expires.

      Specified by:
      commitSync in interface ShareConsumer<K,V>
      Parameters:
      timeout - The maximum amount of time to await completion of the acknowledgement
      Returns:
      A map of the results for each topic-partition for which delivery was acknowledged. If the acknowledgement failed for a topic-partition, an exception is present.
      Throws:
      IllegalArgumentException - if the timeout is negative
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the thread is interrupted while blocked
      KafkaException - for any other unrecoverable errors
      See Also:
    • commitAsync

      public void commitAsync()
      Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement, the acknowledgements to commit have been indicated using acknowledge(ConsumerRecord) or acknowledge(ConsumerRecord, AcknowledgeType). If the consumer is using implicit acknowledgement, all the records returned by the latest call to poll(Duration) are acknowledged.
      Specified by:
      commitAsync in interface ShareConsumer<K,V>
      Throws:
      KafkaException - for any other unrecoverable errors
      See Also:
    • setAcknowledgementCommitCallback

      public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback)
      Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.
      Specified by:
      setAcknowledgementCommitCallback in interface ShareConsumer<K,V>
      Parameters:
      callback - The acknowledgement commit callback
      See Also:
    • clientInstanceId

      public Uuid clientInstanceId(Duration timeout)
      Determines the client's unique client instance ID used for telemetry. This ID is unique to this specific client instance and will not change after it is initially generated. The ID is useful for correlating client operations with telemetry sent to the broker and to its eventual monitoring destinations.

      If telemetry is enabled, this will first require a connection to the cluster to generate the unique client instance ID. This method waits up to timeout for the consumer client to complete the request.

      Client telemetry is controlled by the ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG configuration option.

      Specified by:
      clientInstanceId in interface ShareConsumer<K,V>
      Parameters:
      timeout - The maximum time to wait for consumer client to determine its client instance ID. The value must be non-negative. Specifying a timeout of zero means do not wait for the initial request to complete if it hasn't already.
      Returns:
      The client's assigned instance id used for metrics collection.
      Throws:
      IllegalArgumentException - if the timeout is negative
      IllegalStateException - if telemetry is not enabled
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the thread is interrupted while blocked
      KafkaException - if an unexpected error occurs while trying to determine the client instance ID, though this error does not necessarily imply the consumer client is otherwise unusable
    • metrics

      public Map<MetricName,? extends Metric> metrics()
      Get the metrics kept by the consumer
      Specified by:
      metrics in interface ShareConsumer<K,V>
      See Also:
    • close

      public void close()
      Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. This will commit acknowledgements if possible within the default timeout. See close(Duration) for details. Note that wakeup() cannot be used to interrupt close.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Specified by:
      close in interface ShareConsumer<K,V>
      Throws:
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the thread is interrupted before or while this method is called
      KafkaException - for any other error during close
      See Also:
    • close

      public void close(Duration timeout)
      Tries to close the consumer cleanly within the specified timeout. This method waits up to timeout for the consumer to complete acknowledgements and leave the group. If the consumer is unable to complete acknowledgements and gracefully leave the group before the timeout expires, the consumer is force closed. Note that wakeup() cannot be used to interrupt close.
      Specified by:
      close in interface ShareConsumer<K,V>
      Parameters:
      timeout - The maximum time to wait for consumer to close gracefully. The value must be non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
      Throws:
      IllegalArgumentException - if the timeout is negative
      WakeupException - if wakeup() is called before or while this method is called
      InterruptException - if the thread is interrupted before or while this method is called
      KafkaException - for any other error during close
      See Also:
    • wakeup

      public void wakeup()
      Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll. The thread which is blocking in an operation will throw WakeupException. If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead.
      Specified by:
      wakeup in interface ShareConsumer<K,V>
      See Also: