Kafka Streams applications provide powerful tools for data processing, but the need to run them against a real Kafka cluster in order to exercise and test your code may be frustrating. Luckily, Kafka provides a collection of test utilities that can make the process of testing your code easier. These utilities can even allow you to unit test your streams topologies. In this lab, we will work hands-on with these test utilities by building unit tests for an existing Kafka Streams application.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Clone the Starter Project from GitHub and Perform a Test Run
Clone the starter project from GitHub:
cd ~/ git clone https://github.com/linuxacademy/content-ccdak-streams-tests-lab.git
Perform a test run to make sure the code is able to compile and run:
cd content-ccdak-streams-tests-lab ./gradlew test
The code should compile, but the tests should fail since they are not implemented yet.
- Implement the Unit Tests for the `MemberSignupsStream`
Edit the test class for
MemberSignupsStream
:vi src/test/java/com/linuxacademy/ccdak/streams/MemberSignupsStreamTest.java
Implement the
test_first_name
test:@Test public void test_first_name() { // Verify that the stream accurately parses the first name from the value. ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer()); ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "Summers, Buffy"); testDriver.pipeInput(record); ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer()); OutputVerifier.compareKeyValue(outputRecord, 1, "Buffy"); }
Implement the
test_unknown_name_filter
test:@Test public void test_unknown_name_filter() { // Verify that the stream filters out records with an empty name value. ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer()); ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, "UNKNOWN"); testDriver.pipeInput(record); ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer()); Assert.assertNull(outputRecord); }
Implement the
test_empty_name_filter
test:@Test public void test_empty_name_filter() { // Verify that the stream filters out records with an empty name value. ConsumerRecordFactory<Integer, String> factory = new ConsumerRecordFactory<>("member_signups", new IntegerSerializer(), new StringSerializer()); ConsumerRecord<byte[], byte[]> record = factory.create("member_signups", 1, ""); testDriver.pipeInput(record); ProducerRecord<Integer, String> outputRecord = testDriver.readOutput("member_signups_mail", new IntegerDeserializer(), new StringDeserializer()); Assert.assertNull(outputRecord); }
Run your tests and make sure that they pass:
./gradlew test