Interface ErrorHandlerContext


public interface ErrorHandlerContext
This interface allows user code to inspect the context of a record that has failed during processing.

ErrorHandlerContext instances are passed into DeserializationExceptionHandler, ProcessingExceptionHandler, or ProductionExceptionHandler dependent on what error occurred.

  • Method Summary

    Modifier and Type
    Method
    Description
    Return the headers of the current source record; could be an empty header if it is not available.
    long
    Return the offset of the current input record; could be -1 if it is not available.
    int
    Return the partition ID of the current input record; could be -1 if it is not available.
    Return the current processor node ID.
    Return the task ID.
    long
    Return the current timestamp; could be -1 if it is not available.
    Return the topic name of the current input record; could be null if it is not available.
  • Method Details

    • topic

      String topic()
      Return the topic name of the current input record; could be null if it is not available.

      For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated topic. Another example is KTable.transformValues(ValueTransformerWithKeySupplier, String...) (and siblings), that do not always guarantee to provide a valid topic name, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no topic name is available.

      Returns:
      the topic name
    • partition

      int partition()
      Return the partition ID of the current input record; could be -1 if it is not available.

      For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated partition ID. Another example is KTable.transformValues(ValueTransformerWithKeySupplier, String...) (and siblings), that do not always guarantee to provide a valid partition ID, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no partition is available.

      Returns:
      the partition ID
    • offset

      long offset()
      Return the offset of the current input record; could be -1 if it is not available.

      For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation callback, the record won't have an associated offset. Another example is KTable.transformValues(ValueTransformerWithKeySupplier, String...) (and siblings), that do not always guarantee to provide a valid offset, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no offset is available.

      Returns:
      the offset
    • headers

      Headers headers()
      Return the headers of the current source record; could be an empty header if it is not available.

      For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation callback, the record might not have any associated headers. Another example is KTable.transformValues(ValueTransformerWithKeySupplier, String...) (and siblings), that do not always guarantee to provide valid headers, as they might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL. Additionally, when writing into a changelog topic, there is no associated input record, and thus no headers are available.

      Returns:
      the headers
    • processorNodeId

      String processorNodeId()
      Return the current processor node ID.
      Returns:
      the processor node ID
    • taskId

      TaskId taskId()
      Return the task ID.
      Returns:
      the task ID
    • timestamp

      long timestamp()
      Return the current timestamp; could be -1 if it is not available.

      For example, when writing into a changelog topic, there is no associated input record, and thus no timestamp is available.

      If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from ConsumerRecord by TimestampExtractor. Note, that an upstream Processor might have set a new timestamp by calling forward(record.withTimestamp(...)). In particular, some Kafka Streams DSL operators set result record timestamps explicitly, to guarantee deterministic results.

      If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call):

      If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the current rawRecord ConsumerRecord.

      Returns:
      the timestamp