Class SinkTask
- All Implemented Interfaces:
Task
- Direct Known Subclasses:
MockSinkTask
,VerifiableSinkTask
put(Collection)
API, which should either write them to the downstream system or batch them for
later writing. Periodically, Connect will call flush(Map)
to ensure that batched records are
actually pushed to the downstream system.
Below we describe the lifecycle of a SinkTask.
- Initialization: SinkTasks are first initialized using
initialize(SinkTaskContext)
to prepare the task's context andstart(Map)
to accept configuration and start any services needed for processing. - Partition Assignment: After initialization, Connect will assign the task a set of partitions
using
open(Collection)
. These partitions are owned exclusively by this task until they have been closed withclose(Collection)
. - Record Processing: Once partitions have been opened for writing, Connect will begin forwarding
records from Kafka using the
put(Collection)
API. Periodically, Connect will ask the task to flush records usingflush(Map)
as described above. - Partition Rebalancing: Occasionally, Connect will need to change the assignment of this task.
When this happens, the currently assigned partitions will be closed with
close(Collection)
and the new assignment will be opened usingopen(Collection)
. - Shutdown: When the task needs to be shutdown, Connect will close active partitions (if there
are any) and stop the task using
stop()
-
Field Summary
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
close
(Collection<TopicPartition> partitions) The SinkTask uses this method to close writers for partitions that are no longer assigned to the SinkTask.void
flush
(Map<TopicPartition, OffsetAndMetadata> currentOffsets) Flush all records that have beenput(Collection)
for the specified topic-partitions.void
initialize
(SinkTaskContext context) Initialize the context of this task.void
onPartitionsAssigned
(Collection<TopicPartition> partitions) Deprecated.void
onPartitionsRevoked
(Collection<TopicPartition> partitions) Deprecated.Useclose(Collection)
instead for partition cleanup.void
open
(Collection<TopicPartition> partitions) The SinkTask uses this method to create writers for newly assigned partitions in case of partition rebalance.preCommit
(Map<TopicPartition, OffsetAndMetadata> currentOffsets) Pre-commit hook invoked prior to an offset commit.abstract void
put
(Collection<SinkRecord> records) Put the records in the sink.abstract void
Start the Task.abstract void
stop()
Perform any cleanup to stop this task.
-
Field Details
-
TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this SinkTask.
- See Also:
-
TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask.
- See Also:
-
-
Constructor Details
-
SinkTask
public SinkTask()
-
-
Method Details
-
initialize
Initialize the context of this task. Note that the partition assignment will be empty until Connect has opened the partitions for writing withopen(Collection)
.- Parameters:
context
- The sink task's context
-
start
Start the Task. This should handle any configuration parsing and one-time setup of the task. -
put
Put the records in the sink. This should either write them to the downstream system or batch them for later writing. If this method returns before the records are written to the downstream system, the task must implementflush(Map)
orpreCommit(Map)
to ensure that offsets are only committed for records that have been written to the downstream system (hence avoiding data loss during failures).If this operation fails, the SinkTask may throw a
RetriableException
to indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to be stopped immediately.SinkTaskContext.timeout(long)
can be used to set the maximum time before the batch will be retried.- Parameters:
records
- the collection of records to send
-
flush
Flush all records that have beenput(Collection)
for the specified topic-partitions.- Parameters:
currentOffsets
- the current offset state as of the last call toput(Collection)
, provided for convenience but could also be determined by tracking all offsets included in theSinkRecord
s passed toput(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
. Note that the topic, partition and offset here correspond to the original Kafka topic partition and offset, before anytransformations
have been applied. These can be tracked by the task through theSinkRecord.originalTopic()
,SinkRecord.originalKafkaPartition()
andSinkRecord.originalKafkaOffset()
methods.
-
preCommit
public Map<TopicPartition,OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) Pre-commit hook invoked prior to an offset commit.The default implementation simply invokes
flush(Map)
and is thus able to assume allcurrentOffsets
are safe to commit.- Parameters:
currentOffsets
- the current offset state as of the last call toput(Collection)
, provided for convenience but could also be determined by tracking all offsets included in theSinkRecord
s passed toput(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
. Note that the topic, partition and offset here correspond to the original Kafka topic partition and offset, before anytransformations
have been applied. These can be tracked by the task through theSinkRecord.originalTopic()
,SinkRecord.originalKafkaPartition()
andSinkRecord.originalKafkaOffset()
methods.- Returns:
- an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit. Note that the returned topic-partition to offsets map should use the original Kafka topic partitions and offsets instead of the transformed values.
-
open
The SinkTask uses this method to create writers for newly assigned partitions in case of partition rebalance. This method will be called after partition re-assignment completes and before the SinkTask starts fetching data. Any errors raised from this method will cause the task to stop.Note that the topic partitions here correspond to the original Kafka topic partitions, before any
transformations
have been applied.- Parameters:
partitions
- The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)
-
onPartitionsAssigned
Deprecated.Useopen(Collection)
for partition initialization. -
close
The SinkTask uses this method to close writers for partitions that are no longer assigned to the SinkTask. This method will be called before a rebalance operation starts and after the SinkTask stops fetching data. After being closed, Connect will not write any records to the task until a new set of partitions has been opened. Any errors raised from this method will cause the task to stop.Note that the topic partitions here correspond to the original Kafka topic partitions, before any
transformations
have been applied.- Parameters:
partitions
- The list of partitions that should be closed
-
onPartitionsRevoked
Deprecated.Useclose(Collection)
instead for partition cleanup. -
stop
public abstract void stop()Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other methods have completed (e.g.,put(Collection)
has returned) and a finalflush(Map)
and offset commit has completed. Implementations of this method should only need to perform final cleanup operations, such as closing network connections to the sink system.
-
open(Collection)
for partition initialization.