Interface ErrorHandlerContext
ErrorHandlerContext
instances are passed into DeserializationExceptionHandler
,
ProcessingExceptionHandler
, or ProductionExceptionHandler
dependent on what error occurred.
-
Method Summary
Modifier and TypeMethodDescriptionheaders()
Return the headers of the current source record; could be an empty header if it is not available.long
offset()
Return the offset of the current input record; could be-1
if it is not available.int
Return the partition ID of the current input record; could be-1
if it is not available.Return the current processor node ID.taskId()
Return the task ID.long
Return the current timestamp; could be-1
if it is not available.topic()
Return the topic name of the current input record; could benull
if it is not available.
-
Method Details
-
topic
String topic()Return the topic name of the current input record; could benull
if it is not available.For example, if this method is invoked within a
punctuation callback
, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated topic. Another example isKTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid topic name, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no topic name is available.- Returns:
- the topic name
-
partition
int partition()Return the partition ID of the current input record; could be-1
if it is not available.For example, if this method is invoked within a
punctuation callback
, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated partition ID. Another example isKTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid partition ID, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no partition is available.- Returns:
- the partition ID
-
offset
long offset()Return the offset of the current input record; could be-1
if it is not available.For example, if this method is invoked within a
punctuation callback
, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated offset. Another example isKTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid offset, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no offset is available.- Returns:
- the offset
-
headers
Headers headers()Return the headers of the current source record; could be an empty header if it is not available.For example, if this method is invoked within a
punctuation callback
, or while processing a record that was forwarded by a punctuation callback, the record might not have any associated headers. Another example isKTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide valid headers, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no headers are available.- Returns:
- the headers
-
processorNodeId
String processorNodeId()Return the current processor node ID.- Returns:
- the processor node ID
-
taskId
TaskId taskId()Return the task ID.- Returns:
- the task ID
-
timestamp
long timestamp()Return the current timestamp; could be-1
if it is not available.For example, when writing into a changelog topic, there is no associated input record, and thus no timestamp is available.
If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
ConsumerRecord
byTimestampExtractor
. Note, that an upstreamProcessor
might have set a new timestamp by callingforward(record.withTimestamp(...))
. In particular, some Kafka Streams DSL operators set result record timestamps explicitly, to guarantee deterministic results.If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call):
- In case of
PunctuationType.STREAM_TIME
timestamp is defined as the current task's stream time, which is defined as the largest timestamp of any record processed by the task - In case of
PunctuationType.WALL_CLOCK_TIME
timestamp is defined the current system time
If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the current rawRecord
ConsumerRecord
.- Returns:
- the timestamp
- In case of
-