Skip to content

Contact sales

By filling out this form and clicking submit, you acknowledge our privacy policy.
  • Labs icon Lab
  • A Cloud Guru
Google Cloud Platform icon
Labs

Working with Stream Processing in Kafka

Kafka streams provide the ability to perform powerful data processing operations against Kafka data in real-time. In this lab, we will have the opportunity to work with Kafka streams by building a Java application capable of transforming data about individual purchases into a running total of each item that was bought. In addition to general stream processing, this lab will provide hands-on experience with topics such as stateless and stateful transformations, data aggregation, and type conversion.

Google Cloud Platform icon
Labs

Path Info

Level
Clock icon Intermediate
Duration
Clock icon 1h 0m
Published
Clock icon Oct 18, 2019

Contact sales

By filling out this form and clicking submit, you acknowledge our privacy policy.

Table of Contents

  1. Challenge

    Clone the Starter Project from GitHub and Perform a Test Run

    1. Clone the starter project from GitHub:
    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-kafka-streams-lab.git
    
    1. Perform a test to ensure that the code can compile and run:
    cd content-ccdak-kafka-streams-lab/
    ./gradlew run
    

    The output should contain the message printed by the Main class: Hello, world!

  2. Challenge

    Implement and Run the Kafka Streams Application

    1. Access the build.gradle file that must be edited:
    vi build.gradle
    
    1. Add the following dependencies to the dependencies {...} section of the file:
    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    ...
    
    1. Implement our streams logic in Main.java:
    vi src/main/java/com/linuxacademy/ccdak/streams/Main.java
    

    Note: Remember to check the code comments for some helpful info, or the solution video for a more thorough explanation.

    Here is an example of Main.java:

      package com.linuxacademy.ccdak.streams;
    
      import java.util.Properties;
      import java.util.concurrent.CountDownLatch;
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.KafkaStreams;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.Topology;
      import org.apache.kafka.streams.kstream.Grouped;
      import org.apache.kafka.streams.kstream.KStream;
      import org.apache.kafka.streams.kstream.KTable;
      import org.apache.kafka.streams.kstream.Produced;
    
      public class Main {
    
          public static void main(String[] args) {
              // Set up the configuration.
              final Properties props = new Properties();
              props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-data");
              props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
              // Since the input topic uses Strings for both key and value, set the default Serdes to String.
              props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
              props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
              // Get the source stream.
              final StreamsBuilder builder = new StreamsBuilder();
              final KStream<String, String> source = builder.stream("inventory_purchases");
    
              // Convert the value from String to Integer. If the value is not a properly-formatted number, print a message and set it to 0.
              final KStream<String, Integer> integerValuesSource = source.mapValues(value -> {
                  try {
                      return Integer.valueOf(value);
                  } catch (NumberFormatException e) {
                      System.out.println("Unable to convert to Integer: \"" + value + "\"");
                      return 0;
                  }
              });
    
              // Group by the key and reduce to provide a total quantity for each key.
              final KTable<String, Integer> productCounts = integerValuesSource
                  .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
                  .reduce((total, newQuantity) -> total + newQuantity);
    
              // Output to the output topic.
              productCounts
                  .toStream()
                  .to("total_purchases", Produced.with(Serdes.String(), Serdes.Integer()));
    
              final Topology topology = builder.build();
              final KafkaStreams streams = new KafkaStreams(topology, props);
              // Print the topology to the console.
              System.out.println(topology.describe());
              final CountDownLatch latch = new CountDownLatch(1);
    
              // Attach a shutdown handler to catch control-c and terminate the application gracefully.
              Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                  @Override
                  public void run() {
                      streams.close();
                      latch.countDown();
                  }
              });
    
              try {
                  streams.start();
                  latch.await();
              } catch (final Throwable e) {
                  System.out.println(e.getMessage());
                  System.exit(1);
              }
              System.exit(0);
          }
    
      }
    
    1. Run the code:
    ./gradlew run
    
    1. We can check the output of the application by consuming from the output topic:
    kafka-console-consumer --bootstrap-server localhost:9092 --topic total_purchases --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
    
    1. If we want to visually compare the output with the input, in a new terminal window, we can consume from the input topic with:
    kafka-console-consumer --bootstrap-server localhost:9092 --topic inventory_purchases --property print.key=true
    

The Cloud Content team comprises subject matter experts hyper focused on services offered by the leading cloud vendors (AWS, GCP, and Azure), as well as cloud-related technologies such as Linux and DevOps. The team is thrilled to share their knowledge to help you build modern tech solutions from the ground up, secure and optimize your environments, and so much more!

What's a lab?

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Provided environment for hands-on practice

We will provide the credentials and environment necessary for you to practice right within your browser.

Guided walkthrough

Follow along with the author’s guided walkthrough and build something new in your provided environment!

Did you know?

On average, you retain 75% more of your learning if you get time for practice.

Start learning by doing today

View Plans