public interface ProducerInterceptor<K,V> extends Configurable
This class will get producer config properties via configure()
method, including clientId assigned
by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception, just log the errors.
ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
Modifier and Type | Method and Description |
---|---|
void |
close()
This is called when interceptor is closed
|
void |
onAcknowledgement(RecordMetadata metadata,
Exception exception)
This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
it gets sent to the server.
|
ProducerRecord<K,V> |
onSend(ProducerRecord<K,V> record)
This is called from
KafkaProducer.send(ProducerRecord) and
KafkaProducer.send(ProducerRecord, Callback) methods, before key and value
get serialized and partition is assigned (if partition is not specified in ProducerRecord). |
configure
ProducerRecord<K,V> onSend(ProducerRecord<K,V> record)
KafkaProducer.send(ProducerRecord)
and
KafkaProducer.send(ProducerRecord, Callback)
methods, before key and value
get serialized and partition is assigned (if partition is not specified in ProducerRecord).
This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value, not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent: same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work as expected.
Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord. Most often, it should be the same topic/partition from 'record'.
Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
specified by ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
. The first interceptor
in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
or otherwise the client.
record
- the record from client or the record returned by the previous interceptor in the chain of interceptors.void onAcknowledgement(RecordMetadata metadata, Exception exception)
This method is generally called just before the user callback is called, and in additional cases when KafkaProducer.send()
throws an exception.
Any exception thrown by this method will be ignored by the caller.
This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.
metadata
- The metadata for the record that was sent (i.e. the partition and offset).
If an error occurred, metadata will contain only valid topic and maybe
partition. If partition is not given in ProducerRecord and an error occurs
before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
The metadata may be null if the client passed null record to
KafkaProducer.send(ProducerRecord)
.exception
- The exception thrown during processing of this record. Null if no error occurred.void close()