Red Hat AMQ Streams Architecture
Prerequisite
Clean AMQ Streams Data
make sure to stop all kafka & zookeeper server
by ctrl+c in your terminal or
call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper
clear zookeeper & kafka data
rm -rf /tmp/zookeeper* rm -rf /tmp/kafka*
Generate the certificates
create certificate for enable ssl connection to amq streams cluster
review generate.sh and run for create certificate
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./ssl/generate.sh
example result of generate.sh (path ~/amq-streams-2022/2-amq-streams-architecture/ssl/keys, don't worry about warning in result!).
Start ZooKeeper cluster
First before we start Kafka, we have to start ZooKeeper cluster. We will use 3 node cluster. Start the 3 Zookeeper nodes by running these 3 scripts in different terminals. Don't worry about error after run zookeeper-0.sh (error about QuorumPeer):
Look at the Zookeeper config files in
./configs/zookeeper/
, review zookeeper-1.properties, zookeeper-2.properties, zookeeper-3.propertiesLook at the ensemble configuration in the Zookeeper properties files
Check the content of the Zookeeper data dirs in
/tmp
Notice the
myid
file which needs to be created before starting Zookeeper with the node IDclient port
zookeeper server list (server.1,server.2,server.3)
client to server security config (this lab use 'SASL')
server to server security config (this lab use 'SASL')
review jaas.config for authentication information
run start zookeeper command in different terminal (1 shell script 1 terminal)
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./scripts/zookeeper-0.sh ./scripts/zookeeper-1.sh ./scripts/zookeeper-2.sh
example result
Start Kafka cluster
We will use 3 node Kafka cluster Start the 3 Kafka nodes by running these 3 scripts in different terminals:
Look at the Kafka configuration files in
./configs/kafka/
, review server-0.properties, server-1.properties, server-2.propertiesbroker.id
listeners, advertised listeners, protocols
Zookeeper config
Look at the data dir in
/tmp
Look at the tools in
./kafka/bin
SASL, SSL (server to server, kafka to kafka & zookeeper to kafka use SASL Plaintext in this lab)
review jaas.config for authentication information
run start kafka broker command in different terminal
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./scripts/kafka-0.sh ./scripts/kafka-1.sh ./scripts/kafka-2.sh
example result
Zookeeper
Show what Kafka does in Zookeeper
open new terminal
Find and notice the ZK JAR files in
./kafka/libs
and./kafka/bin
- Zookeeper is integrated into Kafka distribution (see in 2-amq-streams-architecture/)Start the ZK client
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/zookeeper-shell.sh localhost:2181
Browse through the Zookeeper nodes with below command
ls / get /controller ls /brokers ls /brokers/ids get /brokers/ids/0 ls /brokers/topics
example result
Exit from zookeeper-shell with ctrl+c
Try to do netcat dump with connected brokers
echo dump | nc localhost 2181
example result
[student@node1 2-amq-streams-architecture]$ echo dump | nc localhost 2181 SessionTracker dump: Global Sessions(4): 0x100000d21e00000 18000ms 0x100000d21e00001 30000ms 0x300000d42930000 18000ms 0x300000d42930001 18000ms ephemeral nodes dump: Sessions with Ephemerals (3): 0x100000d21e00000: /controller /brokers/ids/0 0x300000d42930000: /brokers/ids/1 0x300000d42930001: /brokers/ids/2 Connections dump: Connections Sets (4)/(2): 0 expire at Fri Nov 11 04:16:53 UTC 2022: 1 expire at Fri Nov 11 04:17:03 UTC 2022: ip: /0:0:0:0:0:0:0:1:50356 sessionId: 0x0 1 expire at Fri Nov 11 04:17:13 UTC 2022: ip: /127.0.0.1:35092 sessionId: 0x100000d21e00000 0 expire at Fri Nov 11 04:17:23 UTC 2022: [student@node1 2-amq-streams-architecture]$
Kill broker 2 (type ctrl+c in kafka broker terminal #3) and do the netcat again to see how it disappeared
echo dump | nc localhost 2181
example result after kill broker 2
[student@node1 2-amq-streams-architecture]$ echo dump | nc localhost 2181 SessionTracker dump: Global Sessions(2): 0x100000d21e00000 18000ms 0x300000d42930000 18000ms ephemeral nodes dump: Sessions with Ephemerals (2): 0x100000d21e00000: /controller /brokers/ids/0 0x300000d42930000: /brokers/ids/1 Connections dump: Connections Sets (2)/(2): 0 expire at Fri Nov 11 04:31:53 UTC 2022: 2 expire at Fri Nov 11 04:32:03 UTC 2022: ip: /0:0:0:0:0:0:0:1:50396 sessionId: 0x0 ip: /127.0.0.1:35092 sessionId: 0x100000d21e00000 [student@node1 2-amq-streams-architecture]$
start broker 2 again with command
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./scripts/kafka-2.sh
Basics Operation with Cluster
Create topic
cd ~/amq-streams-2022/2-amq-streams-architecture/
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --create --topic demo --partitions 3 --replication-factor 3
example result
Created topic demo.
Check the created topic
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9093 --list
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic demo
example result
[student@node1 2-amq-streams-architecture]$ ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic demo
Topic: demo TopicId: YDf9yjTsTFqA2OPWtzJ4GQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=104857600
Topic: demo Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: demo Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: demo Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[student@node1 2-amq-streams-architecture]$
Notice the distribution of leaders and the ISR replicas.
Send some messages
Send at least 10 messages (e.g.
Message 1
,Message 2
etc. to be able to notice the ordering later), exit command with ctrl+ccd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
example result
Consume messages
Read from the whole topic
./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic demo --from-beginning
example result
Notice how the messages are out of order. And check how nicely ordered they are in a single partition. exit old console consumer and call it again and select only partition 0.
./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --partition 0 --from-beginning
example result
Reading from a particular offset (try to change partition and offset to test this command)
./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --partition 0 --offset 1
example result
Replication
Test Broker crash
View topic description with the leaders and new ISR
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic demo
example result
Topic: demo TopicId: YDf9yjTsTFqA2OPWtzJ4GQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: demo Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: demo Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: demo Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Kill broker 2 by ctrl+c in kafka terminal #2 (broker start with 0,1,2)
Look again at the topic description with the leaders which changed and new ISR
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic demo
example result (partition 2 change leader to broker 0)
Topic: demo TopicId: YDf9yjTsTFqA2OPWtzJ4GQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=104857600 Topic: demo Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: demo Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1 Topic: demo Partition: 2 Leader: 0 Replicas: 2,0,1 Isr: 0,1
Consume messages
Try to consume the messages again to confirm that replication worked and that the messages are still in the topic!
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
example result
Send some new messages
./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
example result

Start the broker 2 again
start broker 2 again with command
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./scripts/kafka-2.sh
Look again at the topic description with the leaders which changed and new ISR
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic demo
example result (broker 2 comeback, but leader not change)
Topic: demo TopicId: YDf9yjTsTFqA2OPWtzJ4GQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=104857600 Topic: demo Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2 Topic: demo Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: demo Partition: 2 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2
Leadership didn't changed, but all replicas are again ISR
Try to consume the messages again.
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
Last updated