Confluent Schema Registry gives you the ability to serialize and deserialize complex data objects, as well as manage and enforce contracts between producers and consumers. In this hands-on lab, you will have the opportunity to work with the Confluent Schema Registry by building a full application that uses it. You will create a schema, and then you will build both a producer and a consumer that use the schema to serialize and deserialize data.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Clone the Starter Project and Run it to Make Sure Everything Is Working
Clone the starter project into the
home
directory:cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-registry-lab.git
Run the code to ensure it works before modifying it:
cd content-ccdak-schema-registry-lab/ ./gradlew runProducer ./gradlew runConsumer
Note: We should see a
Hello, world!
message in the output for both the producer and the consumer.
- Implement the Producer and Consumer Using an Avro Schema.
Create the directory for Avro schemas:
mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
Create a schema definition for purchases:
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
{ "namespace": "com.linuxacademy.ccdak.schemaregistry", "type": "record", "name": "Purchase", "fields": [ {"name": "id", "type": "int"}, {"name": "product", "type": "string"}, {"name": "quantity", "type": "int"} ] }
Implement the producer:
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class ProducerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props); Purchase apples = new Purchase(1, "apples", 17); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples)); Purchase oranges = new Purchase(2, "oranges", 5); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges)); producer.close(); } }
Implement the consumer:
vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ConsumerMain.java
package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class ConsumerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); KafkaConsumer<String, Purchase> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("inventory_purchases")); try { BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.txt", true)); while (true) { final ConsumerRecords<String, Purchase> records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord<String, Purchase> record : records) { final String key = record.key(); final Purchase value = record.value(); String outputString = "key=" + key + ", value=" + value; System.out.println(outputString); writer.write(outputString + "n"); } writer.flush(); } } catch (IOException e) { throw new RuntimeException(e); } } }
Run the producer:
./gradlew runProducer
Run the consumer:
./gradlew runConsumer
Verify the data in the output file:
cat /home/cloud_user/output/output.txt