Package org.apache.kafka.connect.sink
Class SinkTask
java.lang.Object
org.apache.kafka.connect.sink.SinkTask
- All Implemented Interfaces:
Task
public abstract class SinkTask extends Object implements Task
SinkTask is a Task that takes records loaded from Kafka and sends them to another system. Each task
instance is assigned a set of partitions by the Connect framework and will handle all records received
from those partitions. As records are fetched from Kafka, they will be passed to the sink task using the
put(Collection)
API, which should either write them to the downstream system or batch them for
later writing. Periodically, Connect will call flush(Map)
to ensure that batched records are
actually pushed to the downstream system..
Below we describe the lifecycle of a SinkTask.
- Initialization: SinkTasks are first initialized using
initialize(SinkTaskContext)
to prepare the task's context andstart(Map)
to accept configuration and start any services needed for processing. - Partition Assignment: After initialization, Connect will assign the task a set of partitions
using
open(Collection)
. These partitions are owned exclusively by this task until they have been closed withclose(Collection)
. - Record Processing: Once partitions have been opened for writing, Connect will begin forwarding
records from Kafka using the
put(Collection)
API. Periodically, Connect will ask the task to flush records usingflush(Map)
as described above. - Partition Rebalancing: Occasionally, Connect will need to change the assignment of this task.
When this happens, the currently assigned partitions will be closed with
close(Collection)
and the new assignment will be opened usingopen(Collection)
. - Shutdown: When the task needs to be shutdown, Connect will close active partitions (if there
are any) and stop the task using
stop()
-
Field Summary
Fields Modifier and Type Field Description protected SinkTaskContext
context
static String
TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this SinkTask.static String
TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask. -
Constructor Summary
Constructors Constructor Description SinkTask()
-
Method Summary
Modifier and Type Method Description void
close(Collection<TopicPartition> partitions)
The SinkTask use this method to close writers for partitions that are no longer assigned to the SinkTask.void
flush(Map<TopicPartition,OffsetAndMetadata> currentOffsets)
Flush all records that have beenput(Collection)
for the specified topic-partitions.void
initialize(SinkTaskContext context)
Initialize the context of this task.void
onPartitionsAssigned(Collection<TopicPartition> partitions)
Deprecated.void
onPartitionsRevoked(Collection<TopicPartition> partitions)
Deprecated.Useclose(Collection)
instead for partition cleanup.void
open(Collection<TopicPartition> partitions)
The SinkTask use this method to create writers for newly assigned partitions in case of partition rebalance.Map<TopicPartition,OffsetAndMetadata>
preCommit(Map<TopicPartition,OffsetAndMetadata> currentOffsets)
Pre-commit hook invoked prior to an offset commit.abstract void
put(Collection<SinkRecord> records)
Put the records in the sink.abstract void
start(Map<String,String> props)
Start the Task.abstract void
stop()
Perform any cleanup to stop this task.
-
Field Details
-
TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this SinkTask.
- See Also:
- Constant Field Values
-
TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask.
- See Also:
- Constant Field Values
-
context
-
-
Constructor Details
-
SinkTask
public SinkTask()
-
-
Method Details
-
initialize
Initialize the context of this task. Note that the partition assignment will be empty until Connect has opened the partitions for writing withopen(Collection)
.- Parameters:
context
- The sink task's context
-
start
Start the Task. This should handle any configuration parsing and one-time setup of the task. -
put
Put the records in the sink. Usually this should send the records to the sink asynchronously and immediately return. If this operation fails, the SinkTask may throw aRetriableException
to indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to be stopped immediately.SinkTaskContext.timeout(long)
can be used to set the maximum time before the batch will be retried.- Parameters:
records
- the set of records to send
-
flush
Flush all records that have beenput(Collection)
for the specified topic-partitions.- Parameters:
currentOffsets
- the current offset state as of the last call toput(Collection)
}, provided for convenience but could also be determined by tracking all offsets included in theSinkRecord
s passed toput(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
.
-
preCommit
public Map<TopicPartition,OffsetAndMetadata> preCommit(Map<TopicPartition,OffsetAndMetadata> currentOffsets)Pre-commit hook invoked prior to an offset commit. The default implementation simply invokesflush(Map)
and is thus able to assume allcurrentOffsets
are safe to commit.- Parameters:
currentOffsets
- the current offset state as of the last call toput(Collection)
}, provided for convenience but could also be determined by tracking all offsets included in theSinkRecord
s passed toput(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
.- Returns:
- an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit.
-
open
The SinkTask use this method to create writers for newly assigned partitions in case of partition rebalance. This method will be called after partition re-assignment completes and before the SinkTask starts fetching data. Note that any errors raised from this method will cause the task to stop.- Parameters:
partitions
- The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)
-
onPartitionsAssigned
Deprecated.Useopen(Collection)
for partition initialization. -
close
The SinkTask use this method to close writers for partitions that are no longer assigned to the SinkTask. This method will be called before a rebalance operation starts and after the SinkTask stops fetching data. After being closed, Connect will not write any records to the task until a new set of partitions has been opened. Note that any errors raised from this method will cause the task to stop.- Parameters:
partitions
- The list of partitions that should be closed
-
onPartitionsRevoked
Deprecated.Useclose(Collection)
instead for partition cleanup. -
stop
public abstract void stop()Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other methods have completed (e.g.,put(Collection)
has returned) and a finalflush(Map)
and offset commit has completed. Implementations of this method should only need to perform final cleanup operations, such as closing network connections to the sink system.
-
open(Collection)
for partition initialization.