Using Avro to Publish and Consume in Kafka PubSub Infrastructure
This article shares how to make use of Avro to publish and consume typed-messages to and from Kakfa broker.
We begin with creating Avro schema in this content
{
"namespace": "sg.com.cyder.kafka.log.avro",
"type": "record",
"name": "ApiLog",
"fields": [{
"name": "timestamp",
"type": "long"
}, {
"name": "host",
"type": "string"
}, {
"name": "category",
"type": "string"
}, {
"name": "level",
"type": "string"
}, {
"name": "message",
"type": "string"
}, {
"name": "detail",
"type": "string"
}]
}
You can find here https://github.com/CyderSG/cyderlogstream/blob/master/schemas/CyderLogAPI.avsc
Then we need to convert that schema to a Java Class. To do this, we make use of Avro tool 1.9.1
java -jar ~/.m2/repository/org/apache/avro/avro-tools/1.9.1/avro-tools-1.9.1.jar compile schema schemas/CyderLogAPI.avsc src/main/java
You can find the shell script to generate the Avro Java class here https://github.com/CyderSG/cyderlogstream/blob/master/generateAvro-CyderLog.sh. This script a Java class ApiLog which takes care of serializing and de-serializing the stream to and from Java object, which you can find here https://github.com/CyderSG/cyderlogstream/blob/master/src/main/java/sg/com/cyder/kafka/log/avro/ApiLog.java
Once you have the ApiLog Java class generated, next step is to register the Schema into the Confluent Schema Registry.
Here is the code snippet do achieve that.
String url = "http://" + System.getenv("SCHEMAREGISTRY") + ":8081";
// associated topic name.
String topic = "cyder-log";
// avro schema avsc file path.
String schemaPath = "./schemas/CyderLogAPI.avsc";
// subject convention is "<topic-name>-value"
String subject = topic + "-api";
// avsc json string.
String schema = null;
FileInputStream inputStream = new FileInputStream(schemaPath);
try {
schema = IOUtils.toString(inputStream);
} finally {
inputStream.close();
}
Schema avroSchema = new Schema.Parser().parse(schema);
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(url, 20);
client.register(subject, avroSchema);
You can find the above code here https://github.com/CyderSG/cyderlogstream/blob/master/src/main/java/sg/com/cyder/kafka/log/schema/RegisterSchemaCyderLogAPI.java
Make sure that the schema is registered. Verify that using schema registry REST API
curl -s “http://broker.kafka.mydomain:8081/subjects/cyder-log-api/versions/latest" | jq
which will produce this response in JSON
{
"subject": "cyder-log-api",
"version": 1,
"id": 41,
"schema": "{\"type\":\"record\",\"name\":\"ApiLog\",\"namespace\":\"sg.com.cyder.kafka.log.avro\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"host\",\"type\":\"string\"},{\"name\":\"category\",\"type\":\"string\"},{\"name\":\"level\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"},{\"name\":\"detail\",\"type\":\"string\"}]}"
}
Now, last part, which is the most interesting part, publish and consume.
To publish, we just simply send the ApiLog object to the stream, thanks to Avro
// message value, avro generic record.
ApiLog record = new ApiLog();
record.setCategory("Leaves");
record.setDetail("Detail - " + new Date());
record.setHost(InetAddress.getLocalHost().getHostAddress());
record.setLevel("INFO");
record.setMessage("Message - " + new Date());
record.setTimestamp(new Date().getTime());q
// send avro message to the topic page-view-event.
producer.send(new ProducerRecord<String, GenericRecord>(topicName, UUID.randomUUID().toString(), record));
You can find the above code here https://github.com/CyderSG/cyderlogstream/blob/master/src/main/java/sg/com/cyder/kafka/log/api/CyderLogAPIProducer.java
And same to consume, you can directly downcast to ApiLog class.
KafkaConsumer<String, ApiLog> kafkaConsumer = new KafkaConsumer<String, ApiLog>(properties);
kafkaConsumer.subscribe(Collections.singleton(topicName));
System.out.println("Waiting for data...");
while (true) {
System.out.println("Polling");
ConsumerRecords<String, ApiLog> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, ApiLog> record : records) {
GenericRecord customer = record.value();
System.out.println(customer);
}
kafkaConsumer.commitSync();
}
You can find the above code her https://github.com/CyderSG/cyderlogstream/blob/master/src/main/java/sg/com/cyder/kafka/log/api/CyderLogAPIConsumer.java.
If you are further interested, you can download the source from https://github.com/CyderSG/cyderlogstream.
You can download the source from https://github.com/CyderSG/cyderlogstream