public class KafkaProducer<K,V> extends Object implements Producer<K,V>
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
The send()
method is asynchronous. When called it adds the record to a buffer of pending record sends
and immediately returns. This allows the producer to batch together individual records for efficiency.
The acks
config controls the criteria under which requests are considered complete. The "all" setting
we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
If the request fails, the producer can automatically retry, though since we have specified retries
as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
message delivery semantics for details).
The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
the batch.size
config. Making this larger can result in more batching, but requires more memory (since we will
generally have one of these buffers for each active partition).
By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
want to reduce the number of requests you can set linger.ms
to something greater than 0. This will
instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
records that arrive close together in time will generally batch together even with linger.ms=0
so under heavy load
batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
efficient requests when not under maximal load at the cost of a small amount of latency.
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records
are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms
after which it throws
a TimeoutException.
The key.serializer
and value.serializer
instruct how to turn the key and value objects the user provides with
their ProducerRecord
into bytes. You can use the included ByteArraySerializer
or
StringSerializer
for simple string or byte types.
This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain client features. You will receive an UnsupportedVersionException when invoking an API that is not available with the running broker verion.
Constructor and Description |
---|
KafkaProducer(Map<String,Object> configs)
A producer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaProducer(Map<String,Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value
Serializer . |
KafkaProducer(Properties properties)
A producer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaProducer(Properties properties,
Serializer<K> keySerializer,
Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value
Serializer . |
Modifier and Type | Method and Description |
---|---|
void |
close()
Close this producer.
|
void |
close(long timeout,
TimeUnit timeUnit)
This method waits up to
timeout for the producer to complete the sending of all incomplete requests. |
void |
flush()
Invoking this method makes all buffered records immediately available to send (even if
linger.ms is
greater than 0) and blocks on the completion of the requests associated with these records. |
Map<MetricName,? extends Metric> |
metrics()
Get the full set of internal metrics maintained by the producer.
|
List<PartitionInfo> |
partitionsFor(String topic)
Get the partition metadata for the give topic.
|
Future<RecordMetadata> |
send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic.
|
Future<RecordMetadata> |
send(ProducerRecord<K,V> record,
Callback callback)
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
|
public KafkaProducer(Map<String,Object> configs)
configs
- The producer configspublic KafkaProducer(Map<String,Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)
Serializer
.
Valid configuration strings are documented here.
Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
either the string "42" or the integer 42).configs
- The producer configskeySerializer
- The serializer for key that implements Serializer
. The configure() method won't be
called in the producer when the serializer is passed in directly.valueSerializer
- The serializer for value that implements Serializer
. The configure() method won't
be called in the producer when the serializer is passed in directly.public KafkaProducer(Properties properties)
properties
- The producer configspublic KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
Serializer
.
Valid configuration strings are documented here.properties
- The producer configskeySerializer
- The serializer for key that implements Serializer
. The configure() method won't be
called in the producer when the serializer is passed in directly.valueSerializer
- The serializer for value that implements Serializer
. The configure() method won't
be called in the producer when the serializer is passed in directly.public Future<RecordMetadata> send(ProducerRecord<K,V> record)
send(record, null)
.
See send(ProducerRecord, Callback)
for details.public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one.
The result of the send is a RecordMetadata
specifying the partition the record was sent to, the offset
it was assigned and the timestamp of the record. If
CreateTime
is used by the topic, the timestamp
will be the user provided timestamp or the record send time if the user did not specify a timestamp for the
record. If LogAppendTime
is used for the
topic, the timestamp will be the Kafka broker local time when the message is appended.
Since the send call is asynchronous it returns a Future
for the
RecordMetadata
that will be assigned to this record. Invoking get()
on this future will block until the associated request completes and then return the metadata for the record
or throw any exception that occurred while sending the record.
If you want to simulate a simple blocking call you can call the get()
method immediately:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
Fully non-blocking usage can make use of the Callback
parameter to provide a callback that
will be invoked when the request is complete.
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
following example callback1
is guaranteed to execute before callback2
:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
they will delay the sending of messages from other threads. If you want to execute blocking or computationally
expensive callbacks it is recommended to use your own Executor
in the callback body
to parallelize processing.
send
in interface Producer<K,V>
record
- The record to sendcallback
- A user-supplied callback to execute when the record has been acknowledged by the server (null
indicates no callback)InterruptException
- If the thread is interrupted while blockedSerializationException
- If the key or value are not valid objects given the configured serializersTimeoutException
- If the time taken for fetching metadata or allocating memory for the record has surpassed max.block.ms
.KafkaException
- If a Kafka related error occurs that does not belong to the public API exceptions.public void flush()
linger.ms
is
greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
of flush()
is that any previously sent record will have completed (e.g. Future.isDone() == true
).
A request is considered completed when it is successfully acknowledged
according to the acks
configuration you have specified or else it results in an error.
Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, however no guarantee is made about the completion of records sent after the flush call begins.
This method can be useful when consuming from some input system and producing into Kafka. The flush()
call
gives a convenient way to ensure all previously sent messages have actually completed.
This example shows how to consume from one Kafka topic and produce to another Kafka topic:
for(ConsumerRecord<String, String> record: consumer.poll(100))
producer.send(new ProducerRecord("my-topic", record.key(), record.value());
producer.flush();
consumer.commit();
Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
we need to set retries=<large_number>
in our config.flush
in interface Producer<K,V>
InterruptException
- If the thread is interrupted while blockedpublic List<PartitionInfo> partitionsFor(String topic)
partitionsFor
in interface Producer<K,V>
InterruptException
- If the thread is interrupted while blockedpublic Map<MetricName,? extends Metric> metrics()
public void close()
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.
If close() is called from Callback
, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
will be called instead. We do this because the sender thread would otherwise try to join itself and
block forever.
close
in interface Closeable
close
in interface AutoCloseable
close
in interface Producer<K,V>
InterruptException
- If the thread is interrupted while blockedpublic void close(long timeout, TimeUnit timeUnit)
timeout
for the producer to complete the sending of all incomplete requests.
If the producer is unable to complete all requests before the timeout expires, this method will fail any unsent and unacknowledged records immediately.
If invoked from within a Callback
this method will not block and will be equivalent to
close(0, TimeUnit.MILLISECONDS)
. This is done since no further sending will happen while
blocking the I/O thread of the producer.
close
in interface Producer<K,V>
timeout
- The maximum time to wait for producer to complete any pending requests. The value should be
non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.timeUnit
- The time unit for the timeout
InterruptException
- If the thread is interrupted while blockedIllegalArgumentException
- If the timeout
is negative.