amq streams training 2022
  • README
  • AMQ Streams 2022 Training
    • Setup Red Hat AMQ Streams Lab
    • Introduction to Red Hat AMQ Streams
    • Red Hat AMQ Streams Architecture
    • Consumer & Producer
    • Reassign Partition
    • Cruise Control
    • Monitoring
    • Authentication & Authorization
    • HTTP Bridge
    • Service Registry
Powered by GitBook
On this page
  • Prerequisite
  • Basic Start with Zookeeper
  • Basic Start Kafka Broker
  • Basic Create Kafka Topic
  • Basic Consuming and producing messages
  • Stop Zookeeper & Kafka Broker
  1. AMQ Streams 2022 Training

Introduction to Red Hat AMQ Streams

PreviousSetup Red Hat AMQ Streams LabNextRed Hat AMQ Streams Architecture

Last updated 2 years ago

Basic Red Hat AMQ Streams:

Prerequisite

Basic Start with Zookeeper

  • go to lab folder 1-introduction-amq-streams

    cd ~/amq-streams-2022/1-introduction-amq-streams
  • see detail of amq streams installation folder (you can install amq streams by download zip file from and unzip to your path )

    tree -L 1 kafka

    example result

    kafka
    ├── LICENSE  
    ├── NOTICE
    ├── bin         --> kafka command line folder
    ├── config      --> kafka configuration folder
    ├── libs        --> kafka java libraries folder
    └── licenses
  • review in kafka/config, see detail of zookeeper configuration. we will use this file for config zookeeper.

    # the directory where the snapshot is stored.
    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
  • First before we start Kafka, we have to start ZooKeeper, start zookeeper with command line (For testing, we start only 1 server.)

    ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
  • wait until zookeeper start complete

    ...
    [2022-11-10 06:36:46,300] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
    [2022-11-10 06:36:46,302] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
  • zookeeper terminal

Basic Start Kafka Broker

  • ...
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    ...
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000  
    ...
  • cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
  • wait unti kafka start complete

    [2022-11-10 06:50:45,035] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
    [2022-11-10 06:50:45,035] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
    [2022-11-10 06:50:45,044] INFO Kafka version: 3.1.0.redhat-00004 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,044] INFO Kafka commitId: 4c168d2882c7f93d (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,044] INFO Kafka startTimeMs: 1668063045036 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,046] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2022-11-10 06:50:45,236] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]: Recorded new controller, from now on will use broker ip-192-168-0-124.us-east-2.compute.internal:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2022-11-10 06:50:45,238] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker ip-192-168-0-124.us-east-2.compute.internal:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
  • kafka broker terminal

  • open new terminal and check zookeeper and kafka process

    jps

    example result

    1749 QuorumPeerMain --> zookeeper process
    6774 Jps
    2319 Kafka          --> kafka process
    • process "QuorumPeerMain" is zookeeper

    • process "Kafka" is kafka broker

Basic Create Kafka Topic

  • List the topics using Kafka, open new terminal and run command

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • no topic show in terminal

  • create sample new topic which we will use

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1

    result of create topic command

    Created topic my-topic.
  • List the topics again to see it was created.

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • Describe the topic to see more details:

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic --describe
  • example output

    Topic: my-topic	TopicId: CXoa2jvVScmiOx6wWx0VzQ	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
    	Topic: my-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
      Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
      Topic: my-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

Basic Consuming and producing messages

  • Start the console producer for create and send message to topic

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
  • Wait until it is ready (it should show >).

  • Next we can consume the messages, open new terminal and run command

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
  • Once ready, back to producer terminal and send some message by typing the message payload and pressing enter to send. such as

    >a
    >b
    >c
    >
  • see output in consumer terminal:

    a
    b
    c

    example result in terminal

  • exit from producer console by type ctrl+c in producer console terminal

  • You can also check the consumer groups:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

    example result

    GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    console-consumer-16045 my-topic        0          -               2               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
    console-consumer-16045 my-topic        1          -               1               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
    console-consumer-16045 my-topic        2          -               1               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
  • exit from consumer console by type ctrl+c in consumer console terminal

  • check the consumer groups again:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

    example result

    Consumer group 'console-consumer-16045' has no active members.

Stop Zookeeper & Kafka Broker

  • run stop server command in another terminal

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-server-stop.sh
    ./kafka/bin/zookeeper-server-stop.sh
  • or type ctrl + c in kafka terminal and zookeeper terminal

  • check with jps command again (if stop complete, kafka and QuorumPeerMain will disappear)

review in kafka/config, see detail of server configuration. we will use this file for start kafka.

Next we can start Kafka Broker, open new terminal by repeat step in and run command (For testing, we start only 1 server.)

server.properties
Setup Red Hat AMQ Streams Lab
red hat
zookkeper.properties
Introduction to Red Hat AMQ Streams
Prerequisite
Basic Start with Zookeeper
Basic Start Kafka Broker
Basic Create Kafka Topic
Basic Consuming and producing messages
Stop Zookeeper & Kafka Broker
Connect to RHPDS VM Lab