Service Registry
Prerequisite
Stop all server in previous lab
type ctrl+c in each terminal (stop kafka before stop zookeeper)
check kafka broker and zookeeper process with jps command
jps
clear old data in previous lab
rm -rf /tmp/zookeep* rm -rf /tmp/kaf*
start zookeeper
cd ~/amq-streams-2022/4-management ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
start kafka broker
cd ~/amq-streams-2022/4-management ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
Install Service Registry
Install Service Registry Operator (OpenShift Admin Task, Ready Now for This Lab)
Login to red hat openshift with admin user
In Administrator Perspective, Go to Operators -> OperatorHub menu. Enter service registry into the search box, the Red Hat Integration - Service Registry Operator will show up on the screen. Then click on it.
A panel with details of the operator will show up on the right. Then click Install button.
You can leave all options as default or change them if needed i.e. install the operator to the project you've created earlier. Then click Install button.
Wait until the operator gets installed successfully before proceeding to next steps.
Create OpenShift Project
Open your web browser, Login to red hat openshift with
Openshift Console URL : --> get it from your instructor
User Name/Password : --> get it from your instructor such as user1/openshift
go to developer console
Go to Projects menu, then click on Create Project button.
Enter a project name such as YOUR_USER_NAME-service-registry (such as user1-service-registry)
Change YOUR_USER_NAME to your user name then click on Create button.
Create Database for Service Registry
Service Registry support 2 type of storage, kafka & database. we select database for this lab.
In Developer Console, select Menu Topology from left menu. right click on panel and select Add to Project --> Database, Console will change to Database Catalog
In Database Catalog, select PostgreSQL
Click Instantiate Template
In Instantiate Template, leave all default value except:
Database Service Name : service-registry
PostgreSQL Conneciton Username : service-registry
PostgreSQL Connection Password : service-registry
PostgreSQL Database Name : service-registry
click Create and wait until database ready (circle around pod service-registry change to dark blue color)
Create Service Registry
In Topology View, click Add to Project Button (book icon)
type "registry" in search box, select "Apicurio Registry" and click Create button
In Create ApicurioRegistry Panel, change configure via to YAML view and copy yaml from apicurio.yml to editor
check username/password is "service-registry"
check jdc url, change user1-service-registry to your project name
click "Create" button
wait until service registry ready (change color to dark blue)
open service registry console by click Open URL on service registry poc icon
Managing schema and API artifacts using Service Registry REST API commands
create schema on service registry with REST API command line
chnage "user1-service-registry" to your project name before run below command!
change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url
curl -X POST -H "Content-Type: application/json; artifactType=AVRO" \ -H "X-Registry-ArtifactId: share-price" \ --data '{"type":"record","name":"price","namespace":"com.example", \ "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' \ http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2/groups/my-group/artifacts
example result
{"createdBy":"","createdOn":"2022-12-08T04:36:16+0000","modifiedBy":"","modifiedOn":"2022-12-08T04:36:16+0000","id":"share-price","version":"1","type":"AVRO","globalId":1,"state":"ENABLED","groupId":"my-group","contentId":1,"references":[]}
retreive schama from service registry with REST API command line
change "user1-service-registry" to your project name before run below command!
change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url
curl http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2/groups/my-group/artifacts/share-price
example result
{"type":"record","name":"price","namespace":"com.example", "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}
Review previous schema in service registry console
Kafka Client Schema Validation with Service Registry
Service Registry provides client serializers/deserializers (SerDes) for Kafka producer and consumer applications written in Java. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications use deserializers to validate that messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.
Review Kafka Client Schema Validation Java Client code at SimpleAvroExample.java
Review and Edit properties of application
REREGISTRY_URL : change to your service registry URL
change "user1-service-registry" to your project name before run below command!
change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url
SERVERS : kafka bootstrap such as localhost:9092
TOPIC_NAME : topic for this application
SCHEMA : ARVO schama of this application
private static final String REGISTRY_URL = "http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2"; private static final String SERVERS = "localhost:9092"; private static final String TOPIC_NAME = SimpleAvroExample.class.getSimpleName(); private static final String SUBJECT_NAME = "Greeting"; private static final String SCHEMA = "{\"type\":\"record\",\"name\":\"Greeting\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}";
Review properties setting in createKafkaProducer method
KEY_SERIALIZER_CLASS_CONFIG : Serializer for "key" of message
VALUE_SERIALIZER_CLASS_CONFIG : Serializer for "message"
REGISTRY_URL : URL of Service Registry
AUTO_REGISTER_ARTIFACT : set auto register schema to service registry if don't found
// Configure kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Use the Apicurio Registry provided Kafka Serializer for Avro props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); // Register the artifact if not found in the registry. props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); //Just if security values are present, then we configure them. configureSecurityIfPresent(props);
Review properties setting in createKafkaConsumer method
KEY_DESERIALIZER_CLASS_CONFIG : DeSerializer for "key" of message
VALUE_DESERIALIZER_CLASS_CONFIG : DeSerializer for "message"
REGISTRY_URL : URL of Service Registry
// Configure Kafka props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Use the Apicurio Registry provided Kafka Deserializer for Avro props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
Try to run kafka client
cd ~/amq-streams-2022/7-service-registry/apicurio-registry-examples/simple-avro mvn compile exec:java -Dexec.mainClass="io.apicurio.registry.examples.simple.avro.SimpleAvroExample"
example result
Starting example SimpleAvroExample SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Producing (5) messages. Messages successfully produced. Closing the producer. Creating the consumer. Subscribing to topic SimpleAvroExample Consuming (5) messages. Consumed a message: Hello (0)! @ Thu Dec 08 13:32:46 ICT 2022 Consumed a message: Hello (1)! @ Thu Dec 08 13:32:47 ICT 2022 Consumed a message: Hello (2)! @ Thu Dec 08 13:32:47 ICT 2022 Consumed a message: Hello (3)! @ Thu Dec 08 13:32:47 ICT 2022 Consumed a message: Hello (4)! @ Thu Dec 08 13:32:48 ICT 2022 Done (success).
Try again with invalid schema
change code in SimpleAvroExample.java
go to line 89, remove remark code to enable invalid schema
record.put("Message", message); record.put("Time", now.getTime()); //record.put("invalid", "invalid");
to
record.put("Message", message); record.put("Time", now.getTime()); record.put("invalid", "invalid");
run client again
cd ~/amq-streams-2022/7-service-registry/apicurio-registry-examples/simple-avro mvn compile exec:java -Dexec.mainClass="io.apicurio.registry.examples.simple.avro.SimpleAvroExample"
example result
Producing (5) messages. Closing the producer. [WARNING] org.apache.avro.AvroRuntimeException: Not a valid schema field: invalid at org.apache.avro.generic.GenericData$Record.put (GenericData.java:253) at io.apicurio.registry.examples.simple.avro.SimpleAvroExample.main (SimpleAvroExample.java:89)
More Example of Service Registry
More Detail about Advanced Red Hat Service Registry
Last updated