Kafka streams provide the ability to perform powerful data processing operations against Kafka data in real-time. In this lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought. In addition to general stream processing, this lab will provide hands-on experience with topics such as stateless and stateful transformations, data aggregation, and type conversion.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Clone the Starter Project from GitHub and Perform a Test Run
Clone the starter project from GitHub:
cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-streams-lab.git
Perform a test to ensure that the code can compile and run:
cd content-ccdak-kafka-streams-lab/ ./gradlew run
The output should contain the message printed by the
Main
class:Hello, world!
- Implement and Run the Kafka Streams Application
Access the
build.gradle
file that must be edited:vi build.gradle
Add the following dependencies to the
dependencies {...}
section of the file:implementation 'org.apache.kafka:kafka-streams:2.2.1' implementation 'org.apache.kafka:kafka-clients:2.2.1' ...
Implement our streams logic in
Main.java
:vi src/main/java/com/linuxacademy/ccdak/streams/Main.java
Note: Remember to check the code comments for some helpful info, or the solution video for a more thorough explanation.
Here is an example of
Main.java
:package com.linuxacademy.ccdak.streams; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; public class Main { public static void main(String[] args) { // Set up the configuration. final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Since the input topic uses Strings for both key and value, set the default Serdes to String. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Get the source stream. final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream("inventory_purchases"); // Convert the value from String to Integer. If the value is not a properly-formatted number, print a message and set it to 0. final KStream<String, Integer> integerValuesSource = source.mapValues(value -> { try { return Integer.valueOf(value); } catch (NumberFormatException e) { System.out.println("Unable to convert to Integer: "" + value + """); return 0; } }); // Group by the key and reduce to provide a total quantity for each key. final KTable<String, Integer> productCounts = integerValuesSource .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) .reduce((total, newQuantity) -> total + newQuantity); // Output to the output topic. productCounts .toStream() .to("total_purchases", Produced.with(Serdes.String(), Serdes.Integer())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); // Print the topology to the console. System.out.println(topology.describe()); final CountDownLatch latch = new CountDownLatch(1); // Attach a shutdown handler to catch control-c and terminate the application gracefully. Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.out.println(e.getMessage()); System.exit(1); } System.exit(0); } }
Run the code:
./gradlew run
We can check the output of the application by consuming from the output topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic total_purchases --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
If we want to visually compare the output with the input, in a new terminal window, we can consume from the input topic with:
kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --property print.key=true