Kafka consumers provide the ability to process data that is stored in Kafka topics. Since you can write consumer code using the Consumer API, it is possible to build consumers that can do practically anything with your Kafka data. In this lab, you will have the opportunity to build a simple consumer that reads from a Kafka topic and writes data to a file on the disk. This lab will give you some hands-on experience with Kafka consumers. Hopefully, it will also spark your imagination about other tasks you might be able to accomplish with Kafka consumers!
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Clone the Starter Project and Run It to Make Sure Everything Is Working
Clone the starter project into the
home
directory:cd ~/ git clone https://github.com/linuxacademy/content-ccdak-kafka-consumer-lab.git
Run the code to ensure it works before modifying it:
cd content-ccdak-kafka-consumer-lab/ ./gradlew run
Note: We should see a
Hello, World!
message in the output.
- Implement the Consumer and Run It to Verify That It Works as Expected
Edit the main class:
vi src/main/java/com/linuxacademy/ccdak/consumer/Main.java
Implement the consumer according to the provided specification:
package com.linuxacademy.ccdak.consumer; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Main { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "group1"); props.setProperty("enable.auto.commit", "true"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("inventory_purchases")); try { BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.dat", true)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { String recordString = "key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset(); System.out.println(recordString); writer.write(recordString + "n"); } consumer.commitSync(); writer.flush(); } } catch (IOException e) { throw new RuntimeException(e); } } }
Execute the program:
./gradlew run
Verify that data is appearing in the output file:
cat /home/cloud_user/output/output.dat