Data Types and Serialization
Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String
) to materialize the data when necessary. Operations that require such SerDes information include: stream()
, table()
, to()
, through()
, groupByKey()
, groupBy()
.
You can provide SerDes by using either of these methods:
- By setting default SerDes via a
StreamsConfig
instance. - By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.
Table of Contents
- Configuring SerDes
- Overriding default SerDes
- Available SerDes
Configuring SerDes
SerDes specified in the Streams configuration via
StreamsConfig
are used as the default in your Kafka Streams application.import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; Properties settings = new Properties(); // Default serde for keys of data records (here: built-in serde for String type) settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Default serde for values of data records (here: built-in serde for Long type) settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); StreamsConfig config = new StreamsConfig(settings);
Overriding default SerDes
You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // The stream userCountByRegion has type `String` for record keys (for region) // and type `Long` for record values (for user counts). KStream<String, Long> userCountByRegion = ...; userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));
If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; // Use the default serializer for record keys (here: region as String) by not specifying the key serde, // but override the default serializer for record values (here: userCount as Long). final Serde<Long> longSerde = Serdes.Long(); KStream<String, Long> userCountByRegion = ...; userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
Available SerDes
Primitive and basic types
Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as
byte[]
in itskafka-clients
Maven artifact:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.2</version> </dependency>
This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.
Data type Serde byte[] Serdes.ByteArray()
,Serdes.Bytes()
(see tip below)ByteBuffer Serdes.ByteBuffer()
Double Serdes.Double()
Integer Serdes.Integer()
Long Serdes.Long()
String Serdes.String()
Tip
Bytes is a wrapper for Java’s
byte[]
(byte array) that supports proper equality and ordering semantics. You may want to consider usingBytes
instead ofbyte[]
in your applications.JSON
The code examples of Kafka Streams also include a basic serde implementation for JSON:
You can construct a unified JSON serde from the
JsonPOJOSerializer
andJsonPOJODeserializer
viaSerdes.serdeFrom(<serializerInstance>, <deserializerInstance>)
. The PageViewTypedDemo example demonstrates how to use this JSON serde.Implementing custom SerDes
If you need to implement custom SerDes, your best starting point is to take a look at the source code references of existing SerDes (see previous section). Typically, your workflow will be similar to:
- Write a serializer for your data type
T
by implementing org.apache.kafka.common.serialization.Serializer. - Write a deserializer for
T
by implementing org.apache.kafka.common.serialization.Deserializer. - Write a serde for
T
by implementing org.apache.kafka.common.serialization.Serde, which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in Serdes such asSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
.
- Write a serializer for your data type