amq streams training 2022
  • README
  • AMQ Streams 2022 Training
    • Setup Red Hat AMQ Streams Lab
    • Introduction to Red Hat AMQ Streams
    • Red Hat AMQ Streams Architecture
    • Consumer & Producer
    • Reassign Partition
    • Cruise Control
    • Monitoring
    • Authentication & Authorization
    • HTTP Bridge
    • Service Registry
Powered by GitBook
On this page
  • Prerequisite
  • Start Cluster
  • Consumer Groups
  • Create a new topic (delete and create new again)
  • Setup consumers
  • Send messages
  • Rebalancing consumer group
  • Message replay
  • Secure Client to Broker Cluster
  • Configuration
  • SSL Consumers and Producers
  • SASL Consumers and Producers
  • Topics & Partition Information
  • Consumer & Producer API
  1. AMQ Streams 2022 Training

Consumer & Producer

PreviousRed Hat AMQ Streams ArchitectureNextReassign Partition

Last updated 2 years ago

Prerequisite

Start Cluster

  • make sure to stop all kafka & zookeeper server

    • by ctrl+c in your terminal or

    • call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper

    • make sure with jps command

  • clear zookeeper & kafka data

    rm -rf /tmp/zookeeper*
    rm -rf /tmp/kafka*
  • cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./scripts/zookeeper-0.sh
    ./scripts/zookeeper-1.sh
    ./scripts/zookeeper-2.sh
    ./scripts/kafka-0.sh
    ./scripts/kafka-1.sh
    ./scripts/kafka-2.sh

Consumer Groups

Create a new topic (delete and create new again)

cd ~/amq-streams-2022/2-amq-streams-architecture/
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic demo
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic demo --partitions 3 --replication-factor 3
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic demo

Setup consumers

  • Open 3 consumers using the same group group-1

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning --property print.key=true --property key.separator=":" --group group-1

    example result

  • New Terminal and Open consumer using a different group group-2

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning  --property print.key=true --property key.separator=":" --group group-2

    example result

Send messages

  • New Terminal and Send some messages with keys (Send messages in the format key:payload - e.g. my-key:my-value)

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"

    example result (send message such as "a:apple", "b:bird")

    example result after send message

Rebalancing consumer group

  • Kill one of the consumers group 1

    example kill consumer group-1 terminal #2

  • Send some messages with the same key as was used before for this consumer

  • Notice that one of the other consumers got the partition assigned and will receive it

    example result

Message replay

  • kill all previous consumer and producer

  • Consume the messages from Kafka with a new group:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning  --property print.key=true --property key.separator=":" --group replay-group

    example result

  • After it consumes all messages, try to restart it to make sure they were all committed - no messages should be received

    example result

  • kill all consumer (with ctrl+c)

  • List all the groups:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups --list

    example result

  • Or describe them:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups --describe

    example result (view current-offset, log-end-offset, lag)

  • Go and reset the offset to 0: with --to-earliest (other option such as set to last offset with --to-latest)

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --group replay-group --topic demo --execute

    example result

  • Try to consume the messages again - you should receive them from the beginning of the topic:

    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning  --property print.key=true --property key.separator=":" --group replay-group

    example result

Secure Client to Broker Cluster

Configuration

    • example configuration

      ############################# SASL #############################
      
      sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
      sasl.mechanism.inter.broker.protocol=PLAIN
      
      ############################# SSL #############################
      
      # Configures kafka broker to request client authentication. The following settings are common:
      #ssl.client.auth=required If set to required client authentication is required.
      #ssl.client.auth=requested This means client authentication is optional. unlike requested , if this option is set client can choose not to provide authentication information about itself
      #ssl.client.auth=none This means client authentication is not needed.
      ssl.client.auth=required
      
      # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
      ssl.keystore.location=./ssl/keys/server-0.keystore
      
      # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
      ssl.keystore.password=123456
      
      # The location of the trust store file.
      ssl.truststore.location=./ssl/keys/truststore
      
      # The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
      ssl.truststore.password=123456

SSL Consumers and Producers

  • Use SSL to producer messages:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-producer.sh --broker-list localhost:19092 \
        --topic demo \
        --producer-property security.protocol=SSL \
        --producer-property ssl.truststore.password=123456 \
        --producer-property ssl.truststore.location=./ssl/keys/truststore \
        --producer-property ssl.keystore.password=123456 \
        --producer-property ssl.keystore.location=./ssl/keys/user1.keystore
  • And consume them:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:19092 \
        --topic demo --from-beginning \
        --consumer-property security.protocol=SSL \
        --consumer-property ssl.truststore.password=123456 \
        --consumer-property ssl.truststore.location=./ssl/keys/truststore \
        --consumer-property ssl.keystore.password=123456 \
        --consumer-property ssl.keystore.location=./ssl/keys/user1.keystore

SASL Consumers and Producers

  • Try to producer some messages:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-producer.sh --broker-list localhost:39092 \
        --topic demo \
        --producer.config sasl-client.properties
  • And consume them:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:39092 \
        --topic demo --from-beginning \
        --consumer.config sasl-client.properties

Topics & Partition Information

  • check demo topic already in cluster

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

    example result

    __consumer_offsets
    demo
  • We already used a lot of commands. You can also use the script to show only some topics in troubles:

    • '--under-replicated-partitions' --> displays the number of partitions that do not have enough replicas to meet the desired replication factor.

    • '--unavailable-partitions' --> list partitions that currently don't have a leader and hence cannot be used by Consumers or Producers.

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --unavailable-partitions

Consumer & Producer API

  • Create new topic for test messages:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic weather-report --partitions 1 --replication-factor 3
  • Start a console consumer to see the sent messages:

    cd ~/amq-streams-2022/2-amq-streams-architecture/
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic weather-report --from-beginning --property print.key=true --property key.separator=":     "
  • Basic Consumer and Producer APIs

    • run simple producer client api

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.basic.ProducerAPI"

      example result in kafka console consumer

      example result in ProducerAPI java application

    • after producer client complete, run simple consumer client api

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.basic.ConsumerAPI"

      example result

  • Acks and Manual Commits with API

    • run producer with acks = "all"

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.acks.ProducerAPI"

      example result

      • see ENABLE_AUTO_COMMIT_CONFIG property

      • see consumer commit method (sync & async)

    • run consumer with async commit

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.manualcommits.ConsumerAPI"

      example result

  • SSL and SSL Authentication with API

    • run producer with ssl

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.ssl.ProducerAPI"

      example result

    • run consumer with ssl

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.ssl.ConsumerAPI"

      example result

  • SASL with API

    • run producer with sasl

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.sasl.ProducerAPI"

      example result

    • run consumer with sasl

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.sasl.ConsumerAPI"

      example result

  • Serialization and deserialization with API

    • run producer with serialization

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.serialization.ProducerAPI"

      example result

      • run consumer with serialization

      cd ~/amq-streams-2022/3-consumer-producer/
      mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.serialization.ConsumerAPI"

      example result

start the Zookeeper and Kafka cluster from the . This cluster will be used for this lab. run cli in each terminal (1 shell script 1 terminal)

Look in the configuration files of the brokers (./configs/kafka/) such as and check the fields related to the TLS & SASL.

Check the file which configures SASL PLAIN authentication

Review Simple Producer Client Source Code at in src/main/java/cz/scholz/kafka/basic folder

Review Simple Consumer Client Source Code at in src/main/java/cz/scholz/kafka/basic folder

Review Acknowledge Config Producer Client (ACKS_CONFIG property set to "all") Source Code at in src/main/java/cz/scholz/kafka/acks folder

Review Manual Commit Config Consumer Client (default auto commit) Source Code at in src/main/java/cz/scholz/kafka/manaulcommits folder

Review SSL Config Producer Client (SSL* property) Source Code at in src/main/java/cz/scholz/kafka/ssl folder

Review SSL Config Consumer Client (SSL* property) Source Code at in src/main/java/cz/scholz/kafka/ssl folder

Review SASL Config Producer Client (SASL* property) Source Code at in src/main/java/cz/scholz/kafka/sasl folder

Review SASL Config Consumer Client (SASL* property) Source Code at in src/main/java/cz/scholz/kafka/sasl folder

Review Serialization Producer Client Source Code at in src/main/java/cz/scholz/kafka/serialization folder

Review DeSerialization Consumer Client Source Code at in src/main/java/cz/scholz/kafka/serialization folder

Setup Red Hat AMQ Streams Lab
Red Hat AMQ Streams Architecture
Red Hat AMQ Streams Architecture
server-0.properties
sasl-client.properties
ProducerAPI.java
ConsumerAPI.java
ProducerAPI.java
ConsumerAPI.java
ProducerAPI.java
ConsumerAPI.java
ProducerAPI.java
ConsumerAPI.java
ProducerAPI.java
ConsumerAPI.java
Consumer and Producer
Prerequisite
Start Cluster
Consumer Groups
Create a new topic (delete and create new again)
Setup consumers
Send messages
Rebalancing consumer group
Message replay
Secure Client to Broker Cluster
Configuration
SSL Consumers and Producers
SASL Consumers and Producers
Topics & Partition Information
Consumer & Producer API