Class SinkTask

java.lang.Object
org.apache.kafka.connect.sink.SinkTask
All Implemented Interfaces:
Task
Direct Known Subclasses:
MockSinkTask, VerifiableSinkTask

public abstract class SinkTask extends Object implements Task
SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task instance is assigned a set of partitions by the Connect framework and will handle all records received from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the put(Collection) API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call flush(Map) to ensure that batched records are actually pushed to the downstream system.

Below we describe the lifecycle of a SinkTask.

  1. Initialization: SinkTasks are first initialized using initialize(SinkTaskContext) to prepare the task's context and start(Map) to accept configuration and start any services needed for processing.
  2. Partition Assignment: After initialization, Connect will assign the task a set of partitions using open(Collection). These partitions are owned exclusively by this task until they have been closed with close(Collection).
  3. Record Processing: Once partitions have been opened for writing, Connect will begin forwarding records from Kafka using the put(Collection) API. Periodically, Connect will ask the task to flush records using flush(Map) as described above.
  4. Partition Rebalancing: Occasionally, Connect will need to change the assignment of this task. When this happens, the currently assigned partitions will be closed with close(Collection) and the new assignment will be opened using open(Collection).
  5. Shutdown: When the task needs to be shutdown, Connect will close active partitions (if there are any) and stop the task using stop()
  • Field Details

    • TOPICS_CONFIG

      public static final String TOPICS_CONFIG

      The configuration key that provides the list of topics that are inputs for this SinkTask.

      See Also:
    • TOPICS_REGEX_CONFIG

      public static final String TOPICS_REGEX_CONFIG

      The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask.

      See Also:
  • Constructor Details

    • SinkTask

      public SinkTask()
  • Method Details

    • initialize

      public void initialize(SinkTaskContext context)
      Initialize the context of this task. Note that the partition assignment will be empty until Connect has opened the partitions for writing with open(Collection).
      Parameters:
      context - The sink task's context
    • start

      public abstract void start(Map<String,String> props)
      Start the Task. This should handle any configuration parsing and one-time setup of the task.
      Specified by:
      start in interface Task
      Parameters:
      props - initial configuration
    • put

      public abstract void put(Collection<SinkRecord> records)
      Put the records in the sink. This should either write them to the downstream system or batch them for later writing. If this method returns before the records are written to the downstream system, the task must implement flush(Map) or preCommit(Map) to ensure that offsets are only committed for records that have been written to the downstream system (hence avoiding data loss during failures).

      If this operation fails, the SinkTask may throw a RetriableException to indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to be stopped immediately. SinkTaskContext.timeout(long) can be used to set the maximum time before the batch will be retried.

      Parameters:
      records - the collection of records to send
    • flush

      public void flush(Map<TopicPartition,OffsetAndMetadata> currentOffsets)
      Flush all records that have been put(Collection) for the specified topic-partitions.
      Parameters:
      currentOffsets - the current offset state as of the last call to put(Collection), provided for convenience but could also be determined by tracking all offsets included in the SinkRecords passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>). Note that the topic, partition and offset here correspond to the original Kafka topic partition and offset, before any transformations have been applied. These can be tracked by the task through the SinkRecord.originalTopic(), SinkRecord.originalKafkaPartition() and SinkRecord.originalKafkaOffset() methods.
    • preCommit

      Pre-commit hook invoked prior to an offset commit.

      The default implementation simply invokes flush(Map) and is thus able to assume all currentOffsets are safe to commit.

      Parameters:
      currentOffsets - the current offset state as of the last call to put(Collection), provided for convenience but could also be determined by tracking all offsets included in the SinkRecords passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>). Note that the topic, partition and offset here correspond to the original Kafka topic partition and offset, before any transformations have been applied. These can be tracked by the task through the SinkRecord.originalTopic(), SinkRecord.originalKafkaPartition() and SinkRecord.originalKafkaOffset() methods.
      Returns:
      an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit. Note that the returned topic-partition to offsets map should use the original Kafka topic partitions and offsets instead of the transformed values.
    • open

      public void open(Collection<TopicPartition> partitions)
      The SinkTask uses this method to create writers for newly assigned partitions in case of partition rebalance. This method will be called after partition re-assignment completes and before the SinkTask starts fetching data. Any errors raised from this method will cause the task to stop.

      Note that the topic partitions here correspond to the original Kafka topic partitions, before any transformations have been applied.

      Parameters:
      partitions - The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)
    • onPartitionsAssigned

      @Deprecated public void onPartitionsAssigned(Collection<TopicPartition> partitions)
      Deprecated.
      Use open(Collection) for partition initialization.
    • close

      public void close(Collection<TopicPartition> partitions)
      The SinkTask uses this method to close writers for partitions that are no longer assigned to the SinkTask. This method will be called before a rebalance operation starts and after the SinkTask stops fetching data. After being closed, Connect will not write any records to the task until a new set of partitions has been opened. Any errors raised from this method will cause the task to stop.

      Note that the topic partitions here correspond to the original Kafka topic partitions, before any transformations have been applied.

      Parameters:
      partitions - The list of partitions that should be closed
    • onPartitionsRevoked

      @Deprecated public void onPartitionsRevoked(Collection<TopicPartition> partitions)
      Deprecated.
      Use close(Collection) instead for partition cleanup.
    • stop

      public abstract void stop()
      Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other methods have completed (e.g., put(Collection) has returned) and a final flush(Map) and offset commit has completed. Implementations of this method should only need to perform final cleanup operations, such as closing network connections to the sink system.
      Specified by:
      stop in interface Task