Using Schema Registry in a Kafka Application

1 hour
  • 2 Learning Objectives

About this Hands-on Lab

Confluent Schema Registry gives you the ability to serialize and deserialize complex data objects, as well as manage and enforce contracts between producers and consumers. In this hands-on lab, you will have the opportunity to work with the Confluent Schema Registry by building a full application that uses it. You will create a schema, and then you will build both a producer and a consumer that use the schema to serialize and deserialize data.

Learning Objectives

Successfully complete this lab by achieving the following learning objectives:

Clone the Starter Project and Run it to Make Sure Everything Is Working
  1. Clone the starter project into the home directory:

    cd ~/
    git clone https://github.com/linuxacademy/content-ccdak-schema-registry-lab.git
  2. Run the code to ensure it works before modifying it:

    cd content-ccdak-schema-registry-lab/
    ./gradlew runProducer
    ./gradlew runConsumer

    Note: We should see a Hello, world! message in the output for both the producer and the consumer.

Implement the Producer and Consumer Using an Avro Schema.
  1. Create the directory for Avro schemas:

    mkdir -p src/main/avro/com/linuxacademy/ccdak/schemaregistry
  2. Create a schema definition for purchases:

    vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
    {
    "namespace": "com.linuxacademy.ccdak.schemaregistry",
    "type": "record",
    "name": "Purchase",
    "fields": [
      {"name": "id", "type": "int"},
      {"name": "product", "type": "string"},
      {"name": "quantity", "type": "int"}
    ]
    }
  3. Implement the producer:

    vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
    package com.linuxacademy.ccdak.schemaregistry;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class ProducerMain {
    
      public static void main(String[] args) {
          final Properties props = new Properties();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(ProducerConfig.ACKS_CONFIG, "all");
          props.put(ProducerConfig.RETRIES_CONFIG, 0);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    
          KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props);
    
          Purchase apples = new Purchase(1, "apples", 17);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples));
    
          Purchase oranges = new Purchase(2, "oranges", 5);
          producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges));
    
          producer.close();
      }
    
    }
  4. Implement the consumer:

    vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ConsumerMain.java
    package com.linuxacademy.ccdak.schemaregistry;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    public class ConsumerMain {
    
      public static void main(String[] args) {
          final Properties props = new Properties();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
          props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    
          KafkaConsumer<String, Purchase> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Collections.singletonList("inventory_purchases"));
    
          try {
              BufferedWriter writer = new BufferedWriter(new FileWriter("/home/cloud_user/output/output.txt", true));
              while (true) {
                  final ConsumerRecords<String, Purchase> records = consumer.poll(Duration.ofMillis(100));
                  for (final ConsumerRecord<String, Purchase> record : records) {
                      final String key = record.key();
                      final Purchase value = record.value();
                      String outputString = "key=" + key + ", value=" + value;
                      System.out.println(outputString);
                      writer.write(outputString + "n");
                  }
                  writer.flush();
              }
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
    
    }
  5. Run the producer:

    ./gradlew runProducer
  6. Run the consumer:

    ./gradlew runConsumer
  7. Verify the data in the output file:

    cat /home/cloud_user/output/output.txt

Additional Resources

Your supermarket company is using Kafka to manage updates to inventory as purchases are made in real-time. In the past, data was published to a topic in a basic format, but the company now wants to use a more complex data structure with multiple data points in each record. This is a good use case for Confluent Schema Registry. Create a schema to represent the data and then build a simple producer to publish some sample data using the schema. Finally, create a consumer to consume this data and output it to a data file.

There is a starter project on GitHub at https://github.com/linuxacademy/content-ccdak-schema-registry-lab. Clone this project to the broker and edit its files to implement the solution.

Use the following specification to build a schema called Purchase. You can place the schema file in src/main/avro/com/linuxacademy/ccdak/schemaregistry/.

  • Field id with type int. This will contain the purchase id.
  • Field product with type string. This will contain the name of the product purchased.
  • Field quantity with type int. This will contain the quantity of the product purchased.

Create a publisher that publishes some sample records using this schema to the inventory_purchases topic.

Then, create a consumer to read these records and output them to a file located at /home/cloud_user/output/output.txt.

You can run the producer in the starter project with the command ./gradlew runProducer. The consumer can be run with ./gradlew runConsumer. Run both the producer and consumer to verify that everything works.

If you get stuck, feel free to check out the solution video, or the detailed instructions under each objective. Good luck!

What are Hands-on Labs

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Get Started
Who’s going to be learning?

How many seats do you need?

  • $499 USD per seat per year
  • Billed Annually
  • Renews in 12 months

Ready to accelerate learning?

For over 25 licenses, a member of our sales team will walk you through a custom tailored solution for your business.


$2,495.00

Checkout
Sign In
Welcome Back!

Psst…this one if you’ve been moved to ACG!