amq streams training 2022
  • README
  • AMQ Streams 2022 Training
    • Setup Red Hat AMQ Streams Lab
    • Introduction to Red Hat AMQ Streams
    • Red Hat AMQ Streams Architecture
    • Consumer & Producer
    • Reassign Partition
    • Cruise Control
    • Monitoring
    • Authentication & Authorization
    • HTTP Bridge
    • Service Registry
Powered by GitBook
On this page
  • Prerequisite
  • Install Service Registry
  • Install Service Registry Operator (OpenShift Admin Task, Ready Now for This Lab)
  • Create OpenShift Project
  • Create Database for Service Registry
  • Create Service Registry
  • Managing schema and API artifacts using Service Registry REST API commands
  • Kafka Client Schema Validation with Service Registry
  • More Example of Service Registry
  • More Detail about Advanced Red Hat Service Registry
  1. AMQ Streams 2022 Training

Service Registry

PreviousHTTP Bridge

Last updated 2 years ago

Prerequisite

  • Stop all server in previous lab

    • type ctrl+c in each terminal (stop kafka before stop zookeeper)

    • check kafka broker and zookeeper process with jps command

      jps
  • clear old data in previous lab

    rm -rf /tmp/zookeep*
    rm -rf /tmp/kaf*
  • start zookeeper

    cd ~/amq-streams-2022/4-management
    ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
  • start kafka broker

    cd ~/amq-streams-2022/4-management
    ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties

Install Service Registry

Install Service Registry Operator (OpenShift Admin Task, Ready Now for This Lab)

  • Login to red hat openshift with admin user

  • In Administrator Perspective, Go to Operators -> OperatorHub menu. Enter service registry into the search box, the Red Hat Integration - Service Registry Operator will show up on the screen. Then click on it.

  • A panel with details of the operator will show up on the right. Then click Install button.

  • You can leave all options as default or change them if needed i.e. install the operator to the project you've created earlier. Then click Install button.

  • Wait until the operator gets installed successfully before proceeding to next steps.

Create OpenShift Project

  • Open your web browser, Login to red hat openshift with

    • Openshift Console URL : --> get it from your instructor

    • User Name/Password : --> get it from your instructor such as user1/openshift

    • go to developer console

  • Go to Projects menu, then click on Create Project button.

  • Enter a project name such as YOUR_USER_NAME-service-registry (such as user1-service-registry)

  • Change YOUR_USER_NAME to your user name then click on Create button.

Create Database for Service Registry

  • Service Registry support 2 type of storage, kafka & database. we select database for this lab.

  • In Developer Console, select Menu Topology from left menu. right click on panel and select Add to Project --> Database, Console will change to Database Catalog

  • In Database Catalog, select PostgreSQL

  • Click Instantiate Template

  • In Instantiate Template, leave all default value except:

    • Database Service Name : service-registry

    • PostgreSQL Conneciton Username : service-registry

    • PostgreSQL Connection Password : service-registry

    • PostgreSQL Database Name : service-registry

  • click Create and wait until database ready (circle around pod service-registry change to dark blue color)

Create Service Registry

  • In Topology View, click Add to Project Button (book icon)

  • type "registry" in search box, select "Apicurio Registry" and click Create button

    • check username/password is "service-registry"

    • check jdc url, change user1-service-registry to your project name

    • click "Create" button

  • wait until service registry ready (change color to dark blue)

  • open service registry console by click Open URL on service registry poc icon

Managing schema and API artifacts using Service Registry REST API commands

  • create schema on service registry with REST API command line

    • chnage "user1-service-registry" to your project name before run below command!

    • change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url

    curl -X POST -H "Content-Type: application/json; artifactType=AVRO" \
    -H "X-Registry-ArtifactId: share-price" \
    --data '{"type":"record","name":"price","namespace":"com.example", \
     "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' \
    http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2/groups/my-group/artifacts

    example result

    {"createdBy":"","createdOn":"2022-12-08T04:36:16+0000","modifiedBy":"","modifiedOn":"2022-12-08T04:36:16+0000","id":"share-price","version":"1","type":"AVRO","globalId":1,"state":"ENABLED","groupId":"my-group","contentId":1,"references":[]}
  • retreive schama from service registry with REST API command line

    • change "user1-service-registry" to your project name before run below command!

    • change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url

    curl http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2/groups/my-group/artifacts/share-price 

    example result

    {"type":"record","name":"price","namespace":"com.example", "fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}
  • Review previous schema in service registry console

Kafka Client Schema Validation with Service Registry

  • Service Registry provides client serializers/deserializers (SerDes) for Kafka producer and consumer applications written in Java. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications use deserializers to validate that messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.

    • Review and Edit properties of application

      • REREGISTRY_URL : change to your service registry URL

        • change "user1-service-registry" to your project name before run below command!

        • change "apps.cluster-82xm2.82xm2.sandbox503.opentlc.com" to your current domain of openshift --> get it from openshift console url

      • SERVERS : kafka bootstrap such as localhost:9092

      • TOPIC_NAME : topic for this application

      • SCHEMA : ARVO schama of this application

      private static final String REGISTRY_URL = "http://service-registry.user1-service-registry.router-default.apps.cluster-82xm2.82xm2.sandbox503.opentlc.com/apis/registry/v2";
      private static final String SERVERS = "localhost:9092";
      private static final String TOPIC_NAME = SimpleAvroExample.class.getSimpleName();
      private static final String SUBJECT_NAME = "Greeting";
      private static final String SCHEMA = "{\"type\":\"record\",\"name\":\"Greeting\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}";
      
    • Review properties setting in createKafkaProducer method

      • KEY_SERIALIZER_CLASS_CONFIG : Serializer for "key" of message

      • VALUE_SERIALIZER_CLASS_CONFIG : Serializer for "message"

      • REGISTRY_URL : URL of Service Registry

      • AUTO_REGISTER_ARTIFACT : set auto register schema to service registry if don't found

      // Configure kafka settings
      props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
      props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
      props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
      props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      // Use the Apicurio Registry provided Kafka Serializer for Avro
      props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
      
      // Configure Service Registry location
      props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
      // Register the artifact if not found in the registry.
      props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
      
      //Just if security values are present, then we configure them.
      configureSecurityIfPresent(props);
    • Review properties setting in createKafkaConsumer method

      • KEY_DESERIALIZER_CLASS_CONFIG : DeSerializer for "key" of message

      • VALUE_DESERIALIZER_CLASS_CONFIG : DeSerializer for "message"

      • REGISTRY_URL : URL of Service Registry

      // Configure Kafka
      props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
      props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
      props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
      props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      // Use the Apicurio Registry provided Kafka Deserializer for Avro
      props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
      
      // Configure Service Registry location
      props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
  • Try to run kafka client

    cd ~/amq-streams-2022/7-service-registry/apicurio-registry-examples/simple-avro 
    mvn compile exec:java -Dexec.mainClass="io.apicurio.registry.examples.simple.avro.SimpleAvroExample"

    example result

    Starting example SimpleAvroExample
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    Producing (5) messages.
    Messages successfully produced.
    Closing the producer.
    Creating the consumer.
    Subscribing to topic SimpleAvroExample
    Consuming (5) messages.
    Consumed a message: Hello (0)! @ Thu Dec 08 13:32:46 ICT 2022
    Consumed a message: Hello (1)! @ Thu Dec 08 13:32:47 ICT 2022
    Consumed a message: Hello (2)! @ Thu Dec 08 13:32:47 ICT 2022
    Consumed a message: Hello (3)! @ Thu Dec 08 13:32:47 ICT 2022
    Consumed a message: Hello (4)! @ Thu Dec 08 13:32:48 ICT 2022
    Done (success).
  • Try again with invalid schema

    • go to line 89, remove remark code to enable invalid schema

                  record.put("Message", message);
                  record.put("Time", now.getTime());
                  //record.put("invalid", "invalid");

      to

                  record.put("Message", message);
                  record.put("Time", now.getTime());
                  record.put("invalid", "invalid");
    • run client again

      cd ~/amq-streams-2022/7-service-registry/apicurio-registry-examples/simple-avro 
      mvn compile exec:java -Dexec.mainClass="io.apicurio.registry.examples.simple.avro.SimpleAvroExample"

      example result

      Producing (5) messages.
      Closing the producer.
      [WARNING] 
      org.apache.avro.AvroRuntimeException: Not a valid schema field: invalid
          at org.apache.avro.generic.GenericData$Record.put (GenericData.java:253)
          at io.apicurio.registry.examples.simple.avro.SimpleAvroExample.main (SimpleAvroExample.java:89)

In Create ApicurioRegistry Panel, change configure via to YAML view and copy yaml from to editor

Review Kafka Client Schema Validation Java Client code at

change code in

More of Service Registry

More Detail about

apicurio.yml
SimpleAvroExample.java
SimpleAvroExample.java
Example
Advanced Red Hat Service Registry
Setup Red Hat AMQ Streams Lab
Service Registry
Prerequisite
Install Service Registry
Install Service Registry Operator (OpenShift Admin Task, Ready Now for This Lab)
Create OpenShift Project
Create Database for Service Registry
Create Service Registry
Managing schema and API artifacts using Service Registry REST API commands
Kafka Client Schema Validation with Service Registry
More Example of Service Registry
More Detail about Advanced Red Hat Service Registry