You are viewing documentation for an older version (2.4) of Kafka. For up-to-date documentation, see the latest version.

Introduction

Kafka Streams

The easiest way to write mission-critical real-time applications and microservices

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

TOUR OF THE STREAMS API

1Intro to Streams

2Creating a Streams Application

3Transforming Data Pt. 1

4Transforming Data Pt. 11


Why you’ll love using Kafka Streams!

  • Elastic, highly scalable, fault-tolerant
  • Deploy to containers, VMs, bare metal, cloud
  • Equally viable for small, medium, & large use cases
  • Fully integrated with Kafka security
  • Write standard Java and Scala applications
  • Exactly-once processing semantics
  • No separate processing cluster required
  • Develop on Mac, Linux, Windows

Write your first app


Kafka Streams use cases

The New York Times uses Apache Kafka and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.

Pinterest uses Apache Kafka and the Kafka Streams API at large scale to power the real-time, predictive budgeting system of their advertising infrastructure. With Kafka Streams, spend predictions are more accurate than ever.

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing  event streams enables our technical team to do near-real time business intelligence.

LINE uses Apache Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one which is Rabo Alerts. This service alerts customers in real-time upon financial events and is built using Kafka Streams.

We use Apache Kafka (and Kafka Streams) to collect and ingest all of our game service logs (including analytics, server, or access logs). Apache Kafka has been one of the core components of our data pipeline from early 2015.

ironSource powers the growth of the world's top games, using Apache Kafka as the backbone infrastructure for the async messaging of millions of events per second that run through their industry-leading game growth platform. In addition ironSource uses the Kafka Streams API to handle multiple real-time use cases, such as budget management, monitoring and alerting.

Kpow is an enterprise-grade toolkit that provides a rich, data-oriented UI and secure API for Apache Kafka. It's designed to give engineers deep visibility and control over their Kafka clusters, Schema Registry, Kafka Connect, and Kafka Streams applications. Offering multiple deployment options for cloud or on-premise environments, Kpow works with any Kafka distribution and integrates with enterprise security systems for authentication and role-based access.

La Redoute, the digital platform for families, uses Kafka as a central nervous system to decouple its application through business events. It enables a decentralized, event-driven architecture bringing near-real-time data reporting, analytics and emerging AI-pipelines combining Kafka Connect, Kafka Streams and KSQL.

Nuuly, a clothing rental subscription from the Urban Outfitters family of brands, uses Kafka as a central nervous system to integrate our front-end customer experience with real-time inventory management and operations at our distribution center. Nuuly relies on Kafka Streams and Kafka Connect, coupled with data science and machine learning to provide in-the-moment business intelligence and to tailor a personalized rental experience to our customers.

Recursion uses Kafka Streams to power its data pipeline for its drug discovery efforts. Kafka is used to to coordinate various services across the company. For more information about the use case see this Kafka Summit talk.

Salesforce adopted Apache Kafka to implement a pub/sub architecture system and to securely add an enterprise-ready, event-driven layer to our multi-tenant system. With Kafka as the central nervous system of our microservices architecture, Kafka Streams applications perform a variety of operations to generate useful real-time insights for our customers.

At Schrödinger, Kafka powers our physics-based computational platform by feeding data into our predictive modeling, data analytics, and collaboration services thus enabling rapid exploration of chemical space.

More specifically, Kafka is used as a distributed high speed event bus while Kafka Connect and Kafka Streams are the basic components of our streaming Change Data Capture framework used by LiveDesign, our enterprise informatics solution.

Currently, Schrödinger processes billions of molecules per week and our Kafka-powered data pipeline enables us to scale our architecture easily and push this even further.

Hello Kafka Streams

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }

                   }
               


                   import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.ValueMapper;
                   import org.apache.kafka.streams.kstream.KeyValueMapper;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                   @Override
                                   public Iterable<String> apply(String textLine) {
                                       return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                   }
                               })
                               .groupBy(new KeyValueMapper<String, String, String>() {
                                   @Override
                                   public String apply(String key, String word) {
                                       return word;
                                   }
                               })
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));


                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }

                   }
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

object WordCountApplication extends App {
  import Serdes._

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}