Writing Tests for a Kafka Streams Application

45 minutes
  • 2 Learning Objectives

About this Hands-on Lab

Kafka Streams applications provide powerful tools for data processing, but the need to run them against a real Kafka cluster in order to exercise and test your code may be frustrating. Luckily, Kafka provides a collection of test utilities that can make the process of testing your code easier. These utilities can even allow you to unit test your streams topologies. In this lab, we will work hands-on with these test utilities by building unit tests for an existing Kafka Streams application.

Learning Objectives

Successfully complete this lab by achieving the following learning objectives:

Clone the Starter Project from GitHub and Perform a Test Run
  1. Clone the starter project from GitHub:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-streams-tests-lab.git
  2. Perform a test run to make sure the code is able to compile and run:

    cd content-ccdak-streams-tests-lab
    ./gradlew test

    The code should compile, but the tests should fail since they are not implemented yet.

Implement the Unit Tests for the `MemberSignupsStream`
  1. Edit the test class for MemberSignupsStream:

    vi src/test/java/com/linuxacademy/ccdak/streams/MemberSignupsStreamTest.java
  2. Implement the test_first_name test:

    @Test
    public void test_first_name() {
        // Verify that the stream accurately parses the first name from the value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "Summers, Buffy");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        OutputVerifier.compareKeyValue(outputRecord, 1, "Buffy");
    }
  3. Implement the test_unknown_name_filter test:

    @Test
    public void test_unknown_name_filter() {
        // Verify that the stream filters out records with an empty name value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "UNKNOWN");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        Assert.assertNull(outputRecord);
    }
  4. Implement the test_empty_name_filter test:

    @Test
    public void test_empty_name_filter() {
        // Verify that the stream filters out records with an empty name value.
        ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer());
        ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "");
        testDriver.pipeInput(record);
    
        ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer());
    
        Assert.assertNull(outputRecord);
    }
  5. Run your tests and make sure that they pass:

    ./gradlew test

Additional Resources

Your supermarket company has a Kafka Streams application that processes messages that are created when customers sign up for a membership program. The application reads these incoming messages and produces data to an output topic that is used to send customers an email with information about their new membership account. This output topic is keyed to the member ID, and the record values are the customer's first name, formatted properly for the mailing.

The company is reviewing the codebase for compliance with good practices, and this Streams application has no unit tests. Your task is to write some unit tests for the application.

There is a project in GitHub that contains the code. Clone this project to the Dev server. The consumer class is located at src/main/java/com/linuxacademy/ccdak/streams/MemberSignupsStream.java. You can find a test class at src/test/java/com/linuxacademy/ccdak/streams/MemberSignupsStreamTest.java. Edit the test class and implement your unit tests there. There are already test methods and some test fixtures set up in the class.

Here are some notes on the features of the application and the tests that need to be created:

  • test_first_name — The stream takes records which usually have customer names in the form LastName, FirstName. The stream parses the value in order to extract only the first name for the mailing. Test this functionality by producing a record with a value in the LastName, FirstName format and verifying that the output record has only the first name as its value.
  • test_unknown_name_filter — Some legacy systems are still producing records to the input topic with a value of UNKNOWN when the customer name is unknown. For now, we won't send these customers an email, so the stream filters these records out. Produce a record with a value of UNKNOWN and verify that there is no corresponding output record.
  • test_empty_name_filter — There are also some input systems that produce records to the input topic with an empty string as the value when the customer name is unknown. The streams application also filters out these records. Produce a record with an empty value and verify that there is no corresponding output record.

If you get stuck, feel free to check out the solution video or the detailed instructions under each objective. Good luck!

What are Hands-on Labs

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Get Started
Who’s going to be learning?

How many seats do you need?

  • $499 USD per seat per year
  • Billed Annually
  • Renews in 12 months

Ready to accelerate learning?

For over 25 licenses, a member of our sales team will walk you through a custom tailored solution for your business.


$2,495.00

Checkout
Sign In
Welcome Back!

Psst…this one if you’ve been moved to ACG!