org.apache.kafka.clients.producer
Class KafkaProducer<K,V>

java.lang.Object
  extended by org.apache.kafka.clients.producer.KafkaProducer<K,V>
All Implemented Interfaces:
java.io.Closeable, Producer<K,V>

public class KafkaProducer<K,V>
extends java.lang.Object
implements Producer<K,V>

A Kafka client that publishes records to the Kafka cluster.

The producer is thread safe and should generally be shared among all threads for best performance.

The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it needs to communicate with. Failure to close the producer after use will leak these resources.


Constructor Summary
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)
          A producer is instantiated by providing a set of key-value pairs as configuration.
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)
          A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer.
KafkaProducer(java.util.Properties properties)
          A producer is instantiated by providing a set of key-value pairs as configuration.
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
          A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer.
 
Method Summary
 void close()
          Close this producer.
 java.util.Map<MetricName,? extends Metric> metrics()
          Return a map of metrics maintained by the producer
 java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
          Get a list of partitions for the given topic for custom partition assignment.
 java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
          Asynchronously send a record to a topic.
 java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
          Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

KafkaProducer

public KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)
A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings are documented here. Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).

Parameters:
configs - The producer configs

KafkaProducer

public KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs,
                     Serializer<K> keySerializer,
                     Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer. Valid configuration strings are documented here. Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept either the string "42" or the integer 42).

Parameters:
configs - The producer configs
keySerializer - The serializer for key that implements Serializer. The configure() method won't be called when the serializer is passed in directly.
valueSerializer - The serializer for value that implements Serializer. The configure() method won't be called when the serializer is passed in directly.

KafkaProducer

public KafkaProducer(java.util.Properties properties)
A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings are documented here.

Parameters:
properties - The producer configs

KafkaProducer

public KafkaProducer(java.util.Properties properties,
                     Serializer<K> keySerializer,
                     Serializer<V> valueSerializer)
A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer. Valid configuration strings are documented here.

Parameters:
properties - The producer configs
keySerializer - The serializer for key that implements Serializer. The configure() method won't be called when the serializer is passed in directly.
valueSerializer - The serializer for value that implements Serializer. The configure() method won't be called when the serializer is passed in directly.
Method Detail

send

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic. Equivalent to send(record, null)

Specified by:
send in interface Producer<K,V>
Parameters:
record - The record to be sent
Returns:
A future which will eventually contain the response information

send

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record,
                                                        Callback callback)
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.

The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one.

The result of the send is a RecordMetadata specifying the partition the record was sent to and the offset it was assigned.

Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will result in the metadata for the record or throw any exception that occurred while sending the record.

If you want to simulate a simple blocking call you can do the following:

producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
 

Those desiring fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
   producer.send(myRecord,
                new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null)
                             e.printStackTrace();
                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
                     }
                });
 
Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the following example callback1 is guaranteed to execute before callback2:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
 producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
 

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is controlled by the configuration total.memory.bytes. If send() is called faster than the I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in this case is to block the send call until the I/O thread catches up and more buffer space is available. However in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the producer to instead throw an exception when buffer memory is exhausted.

Specified by:
send in interface Producer<K,V>
Parameters:
record - The record to send
callback - A user-supplied callback to execute when the record has been acknowledged by the server (null indicates no callback)

partitionsFor

public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
Description copied from interface: Producer
Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change over time so this list should not be cached.

Specified by:
partitionsFor in interface Producer<K,V>

metrics

public java.util.Map<MetricName,? extends Metric> metrics()
Description copied from interface: Producer
Return a map of metrics maintained by the producer

Specified by:
metrics in interface Producer<K,V>

close

public void close()
Close this producer. This method blocks until all in-flight requests complete.

Specified by:
close in interface java.io.Closeable
Specified by:
close in interface Producer<K,V>