Working with Stream Processing in Kafka

1 hour
  • 2 Learning Objectives

About this Hands-on Lab

Kafka streams provide the ability to perform powerful data processing operations against Kafka data in real-time. In this lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought. In addition to general stream processing, this lab will provide hands-on experience with topics such as stateless and stateful transformations, data aggregation, and type conversion.

Learning Objectives

Successfully complete this lab by achieving the following learning objectives:

Clone the Starter Project from GitHub and Perform a Test Run
  1. Clone the starter project from GitHub:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-kafka-streams-lab.git
  2. Perform a test to ensure that the code can compile and run:

    cd content-ccdak-kafka-streams-lab/
    ./gradlew run

    The output should contain the message printed by the Main class: Hello, world!

Implement and Run the Kafka Streams Application
  1. Access the build.gradle file that must be edited:

    vi build.gradle
  2. Add the following dependencies to the dependencies {...} section of the file:

    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    ...
  3. Implement our streams logic in Main.java:

    vi src/main/java/com/linuxacademy/ccdak/streams/Main.java

    Note: Remember to check the code comments for some helpful info, or the solution video for a more thorough explanation.

    Here is an example of Main.java:

    package com.linuxacademy.ccdak.streams;
    
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.Grouped;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Produced;
    
    public class Main {
    
        public static void main(String[] args) {
            // Set up the configuration.
            final Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            // Since the input topic uses Strings for both key and value, set the default Serdes to String.
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            // Get the source stream.
            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> source = builder.stream("inventory_purchases");
    
            // Convert the value from String to Integer. If the value is not a properly-formatted number, print a message and set it to 0.
            final KStream<String, Integer> integerValuesSource = source.mapValues(value -> {
                try {
                    return Integer.valueOf(value);
                } catch (NumberFormatException e) {
                    System.out.println("Unable to convert to Integer: "" + value + """);
                    return 0;
                }
            });
    
            // Group by the key and reduce to provide a total quantity for each key.
            final KTable<String, Integer> productCounts = integerValuesSource
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
                .reduce((total, newQuantity) -> total + newQuantity);
    
            // Output to the output topic.
            productCounts
                .toStream()
                .to("total_purchases", Produced.with(Serdes.String(), Serdes.Integer()));
    
            final Topology topology = builder.build();
            final KafkaStreams streams = new KafkaStreams(topology, props);
            // Print the topology to the console.
            System.out.println(topology.describe());
            final CountDownLatch latch = new CountDownLatch(1);
    
            // Attach a shutdown handler to catch control-c and terminate the application gracefully.
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (final Throwable e) {
                System.out.println(e.getMessage());
                System.exit(1);
            }
            System.exit(0);
        }
    
    }
  4. Run the code:

    ./gradlew run
  5. We can check the output of the application by consuming from the output topic:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic total_purchases --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
  6. If we want to visually compare the output with the input, in a new terminal window, we can consume from the input topic with:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --property print.key=true

Additional Resources

Your supermarket company is working toward using Kafka to automate some aspects of inventory management. Currently, they are trying to use Kafka to keep track of purchases so that inventory systems can be updated as items are bought. So far, the company has created a Kafka topic where they are publishing information about the types and quantities of items being purchased.

When a customer makes a purchase, a record is published to the inventory_purchases topic for each item type (i.e., "apples"). These records have the item type as the key and the quantity purchased in the transaction as the value. An example record would look like this, indicating that a customer bought five apples:

apples:5

Your task is to build a Kafka streams application that will take the data about individual item purchases and output a running total purchase quantity for each item type. The output topic is called total_purchases. So, for example, with the following input from inventory_purchases:

apples:5
oranges:2
apples:3

Your streams application should output the following to total_purchases:

apples:8
oranges:2

Be sure to output the total quantity as an Integer. Note that the input topic has the item quantity serialized as a String, so you will need to work around this using type conversion.

To get started, use the starter project located at https://github.com/linuxacademy/content-ccdak-kafka-streams-lab. This GitHub project also contains an end-state branch with an example solution for the lab.

You should be able to perform all work on the Broker 1 server.

If you get stuck, feel free to check out the solution video, or the detailed instructions under each objective. Good luck!

What are Hands-on Labs

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Sign In
Welcome Back!

Psst…this one if you’ve been moved to ACG!

Get Started
Who’s going to be learning?