Confluent Schema Registry is a useful tool for coordinating contracts between producers and consumers, as well as simplifies the process of serializing and deserializing complex data objects. However, it also provides some powerful functionality to help you manage changes to your data schemas. In this lab, you will have the opportunity to make a change to an existing schema by adding a new field. This will give you some hands-on experience with the process of evolving a schema using the Confluent Schema Registry.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Clone the Starter Project and Run It to Make Sure Everything Is Working
Clone the starter project into your
home
directory:cd ~/ git clone https://github.com/linuxacademy/content-ccdak-schema-evolve-lab.git
Run the code to ensure it works before modifying it:
cd content-ccdak-schema-evolve-lab ./gradlew runProducer ./gradlew runConsumer
Note: The consumer should output some records that were created by the producer.
- Update the Purchase Schema to Add the `member_id` Field
Edit the schema definition file:
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
Add the
member_id
field with a blank default:{ "namespace": "com.linuxacademy.ccdak.schemaregistry", "compatibility": "FORWARD", "type": "record", "name": "Purchase", "fields": [ {"name": "id", "type": "int"}, {"name": "product", "type": "string"}, {"name": "quantity", "type": "int"}, {"name": "member_id", "type": "int", "default": 0} ] }
- Update the Producer to Set the `member_id` for the Records It Publishes
Edit the schema definition file:
vi src/main/avro/com/linuxacademy/ccdak/schemaregistry/Purchase.avsc
Add the
member_id
field with a blank default:{ "namespace": "com.linuxacademy.ccdak.schemaregistry", "compatibility": "FORWARD", "type": "record", "name": "Purchase", "fields": [ {"name": "id", "type": "int"}, {"name": "product", "type": "string"}, {"name": "quantity", "type": "int"}, {"name": "member_id", "type": "int", "default": 0} ] }
Update the Producer to Set the
member_id
for the Records It PublishesEdit the producer
Main
class:vi src/main/java/com/linuxacademy/ccdak/schemaregistry/ProducerMain.java
Implement the new
member_id
field in the producer by setting it for the records being produced:package com.linuxacademy.ccdak.schemaregistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class ProducerMain { public static void main(String[] args) { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); KafkaProducer<String, Purchase> producer = new KafkaProducer<String, Purchase>(props); Purchase apples = new Purchase(1, "apples", 17, 77543); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", apples.getId().toString(), apples)); Purchase oranges = new Purchase(2, "oranges", 5, 56878); producer.send(new ProducerRecord<String, Purchase>("inventory_purchases", oranges.getId().toString(), oranges)); producer.close(); } }
Run the producer:
./gradlew runProducer
Run the consumer:
./gradlew runConsumer
Verify the data in the output file. We should see the new
member_id
data in the last lines of the file:cat /home/cloud_user/output/output.txt