Introduction to Red Hat AMQ Streams

Basic Red Hat AMQ Streams:

Prerequisite

Basic Start with Zookeeper

  • go to lab folder 1-introduction-amq-streams

    cd ~/amq-streams-2022/1-introduction-amq-streams
  • see detail of amq streams installation folder (you can install amq streams by download zip file from red hat and unzip to your path )

    tree -L 1 kafka

    example result

    kafka
    ├── LICENSE  
    ├── NOTICE
    ├── bin         --> kafka command line folder
    ├── config      --> kafka configuration folder
    ├── libs        --> kafka java libraries folder
    └── licenses
  • review zookkeper.properties in kafka/config, see detail of zookeeper configuration. we will use this file for config zookeeper.

    # the directory where the snapshot is stored.
    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
  • First before we start Kafka, we have to start ZooKeeper, start zookeeper with command line (For testing, we start only 1 server.)

    ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
  • wait until zookeeper start complete

    ...
    [2022-11-10 06:36:46,300] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
    [2022-11-10 06:36:46,302] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
  • zookeeper terminal

Basic Start Kafka Broker

  • review server.properties in kafka/config, see detail of server configuration. we will use this file for start kafka.

    ...
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    ...
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000  
    ...
  • Next we can start Kafka Broker, open new terminal by repeat step in Connect to RHPDS VM Lab and run command (For testing, we start only 1 server.)

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
  • wait unti kafka start complete

    [2022-11-10 06:50:45,035] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
    [2022-11-10 06:50:45,035] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
    [2022-11-10 06:50:45,044] INFO Kafka version: 3.1.0.redhat-00004 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,044] INFO Kafka commitId: 4c168d2882c7f93d (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,044] INFO Kafka startTimeMs: 1668063045036 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-11-10 06:50:45,046] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2022-11-10 06:50:45,236] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]: Recorded new controller, from now on will use broker ip-192-168-0-124.us-east-2.compute.internal:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
    [2022-11-10 06:50:45,238] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker ip-192-168-0-124.us-east-2.compute.internal:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
  • kafka broker terminal

  • open new terminal and check zookeeper and kafka process

    jps

    example result

    1749 QuorumPeerMain --> zookeeper process
    6774 Jps
    2319 Kafka          --> kafka process
    • process "QuorumPeerMain" is zookeeper

    • process "Kafka" is kafka broker

Basic Create Kafka Topic

  • List the topics using Kafka, open new terminal and run command

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • no topic show in terminal

  • create sample new topic which we will use

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1

    result of create topic command

    Created topic my-topic.
  • List the topics again to see it was created.

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • Describe the topic to see more details:

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic --describe
  • example output

    Topic: my-topic	TopicId: CXoa2jvVScmiOx6wWx0VzQ	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
    	Topic: my-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
      Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
      Topic: my-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

Basic Consuming and producing messages

  • Start the console producer for create and send message to topic

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
  • Wait until it is ready (it should show >).

  • Next we can consume the messages, open new terminal and run command

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
  • Once ready, back to producer terminal and send some message by typing the message payload and pressing enter to send. such as

    >a
    >b
    >c
    >
  • see output in consumer terminal:

    a
    b
    c

    example result in terminal

  • exit from producer console by type ctrl+c in producer console terminal

  • You can also check the consumer groups:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

    example result

    GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    console-consumer-16045 my-topic        0          -               2               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
    console-consumer-16045 my-topic        1          -               1               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
    console-consumer-16045 my-topic        2          -               1               -               console-consumer-836b8dc7-50a8-4c4d-a1d7-f227607dff33 /192.168.0.124  console-consumer
  • exit from consumer console by type ctrl+c in consumer console terminal

  • check the consumer groups again:

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

    example result

    Consumer group 'console-consumer-16045' has no active members.

Stop Zookeeper & Kafka Broker

  • run stop server command in another terminal

    cd ~/amq-streams-2022/1-introduction-amq-streams
    ./kafka/bin/kafka-server-stop.sh
    ./kafka/bin/zookeeper-server-stop.sh
  • or type ctrl + c in kafka terminal and zookeeper terminal

  • check with jps command again (if stop complete, kafka and QuorumPeerMain will disappear)

Last updated