Consumer & Producer
Prerequisite
Start Cluster
make sure to stop all kafka & zookeeper server
by ctrl+c in your terminal or
call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper
make sure with jps command
clear zookeeper & kafka data
rm -rf /tmp/zookeeper* rm -rf /tmp/kafka*
start the Zookeeper and Kafka cluster from the Red Hat AMQ Streams Architecture. This cluster will be used for this lab. run cli in each terminal (1 shell script 1 terminal)
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./scripts/zookeeper-0.sh ./scripts/zookeeper-1.sh ./scripts/zookeeper-2.sh ./scripts/kafka-0.sh ./scripts/kafka-1.sh ./scripts/kafka-2.sh
Consumer Groups
Create a new topic (delete and create new again)
cd ~/amq-streams-2022/2-amq-streams-architecture/
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic demo
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic demo --partitions 3 --replication-factor 3
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic demo
Setup consumers
Open 3 consumers using the same group
group-1
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning --property print.key=true --property key.separator=":" --group group-1
example result
New Terminal and Open consumer using a different group
group-2
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning --property print.key=true --property key.separator=":" --group group-2
example result
Send messages
New Terminal and Send some messages with keys (Send messages in the format
key:payload
- e.g.my-key:my-value
)cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"
example result (send message such as "a:apple", "b:bird")
example result after send message
Rebalancing consumer group
Kill one of the consumers group 1
example kill consumer group-1 terminal #2
Send some messages with the same key as was used before for this consumer
Notice that one of the other consumers got the partition assigned and will receive it
example result
Message replay
kill all previous consumer and producer
Consume the messages from Kafka with a new group:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning --property print.key=true --property key.separator=":" --group replay-group
example result
After it consumes all messages, try to restart it to make sure they were all committed - no messages should be received
example result
kill all consumer (with ctrl+c)
List all the groups:
./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups --list
example result
Or describe them:
./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups --describe
example result (view current-offset, log-end-offset, lag)
Go and reset the offset to 0: with --to-earliest (other option such as set to last offset with --to-latest)
./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --group replay-group --topic demo --execute
example result
Try to consume the messages again - you should receive them from the beginning of the topic:
./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning --property print.key=true --property key.separator=":" --group replay-group
example result
Secure Client to Broker Cluster
Configuration
Look in the configuration files of the brokers (
./configs/kafka/
) such as server-0.properties and check the fields related to the TLS & SASL.example configuration
############################# SASL ############################# sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=PLAIN ############################# SSL ############################# # Configures kafka broker to request client authentication. The following settings are common: #ssl.client.auth=required If set to required client authentication is required. #ssl.client.auth=requested This means client authentication is optional. unlike requested , if this option is set client can choose not to provide authentication information about itself #ssl.client.auth=none This means client authentication is not needed. ssl.client.auth=required # The location of the key store file. This is optional for client and can be used for two-way authentication for client. ssl.keystore.location=./ssl/keys/server-0.keystore # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. ssl.keystore.password=123456 # The location of the trust store file. ssl.truststore.location=./ssl/keys/truststore # The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. ssl.truststore.password=123456
SSL Consumers and Producers
Use SSL to producer messages:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-producer.sh --broker-list localhost:19092 \ --topic demo \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=123456 \ --producer-property ssl.truststore.location=./ssl/keys/truststore \ --producer-property ssl.keystore.password=123456 \ --producer-property ssl.keystore.location=./ssl/keys/user1.keystore
And consume them:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:19092 \ --topic demo --from-beginning \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=123456 \ --consumer-property ssl.truststore.location=./ssl/keys/truststore \ --consumer-property ssl.keystore.password=123456 \ --consumer-property ssl.keystore.location=./ssl/keys/user1.keystore
SASL Consumers and Producers
Check the sasl-client.properties file which configures SASL PLAIN authentication
Try to producer some messages:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-producer.sh --broker-list localhost:39092 \ --topic demo \ --producer.config sasl-client.properties
And consume them:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:39092 \ --topic demo --from-beginning \ --consumer.config sasl-client.properties
Topics & Partition Information
check demo topic already in cluster
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
example result
__consumer_offsets demo
We already used a lot of commands. You can also use the script to show only some topics in troubles:
'--under-replicated-partitions' --> displays the number of partitions that do not have enough replicas to meet the desired replication factor.
'--unavailable-partitions' --> list partitions that currently don't have a leader and hence cannot be used by Consumers or Producers.
./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --unavailable-partitions
Consumer & Producer API
Create new topic for test messages:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic weather-report --partitions 1 --replication-factor 3
Start a console consumer to see the sent messages:
cd ~/amq-streams-2022/2-amq-streams-architecture/ ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic weather-report --from-beginning --property print.key=true --property key.separator=": "
Basic Consumer and Producer APIs
Review Simple Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/basic folder
run simple producer client api
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.basic.ProducerAPI"
example result in kafka console consumer
example result in ProducerAPI java application
Review Simple Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/basic folder
after producer client complete, run simple consumer client api
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.basic.ConsumerAPI"
example result
Acks and Manual Commits with API
Review Acknowledge Config Producer Client (ACKS_CONFIG property set to "all") Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/acks folder
run producer with acks = "all"
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.acks.ProducerAPI"
example result
Review Manual Commit Config Consumer Client (default auto commit) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/manaulcommits folder
see ENABLE_AUTO_COMMIT_CONFIG property
see consumer commit method (sync & async)
run consumer with async commit
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.manualcommits.ConsumerAPI"
example result
SSL and SSL Authentication with API
Review SSL Config Producer Client (SSL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/ssl folder
run producer with ssl
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.ssl.ProducerAPI"
example result
Review SSL Config Consumer Client (SSL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/ssl folder
run consumer with ssl
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.ssl.ConsumerAPI"
example result
SASL with API
Review SASL Config Producer Client (SASL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/sasl folder
run producer with sasl
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.sasl.ProducerAPI"
example result
Review SASL Config Consumer Client (SASL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/sasl folder
run consumer with sasl
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.sasl.ConsumerAPI"
example result
Serialization and deserialization with API
Review Serialization Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/serialization folder
run producer with serialization
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.serialization.ProducerAPI"
example result
Review DeSerialization Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/serialization folder
run consumer with serialization
cd ~/amq-streams-2022/3-consumer-producer/ mvn compile exec:java -Dexec.mainClass="cz.scholz.kafka.serialization.ConsumerAPI"
example result
Last updated