Consumer & Producer
Prerequisite
Start Cluster
make sure to stop all kafka & zookeeper server
by ctrl+c in your terminal or
call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper
make sure with jps command
clear zookeeper & kafka data
start the Zookeeper and Kafka cluster from the Red Hat AMQ Streams Architecture. This cluster will be used for this lab. run cli in each terminal (1 shell script 1 terminal)
Consumer Groups
Create a new topic (delete and create new again)
Setup consumers
Open 3 consumers using the same group
group-1example result

New Terminal and Open consumer using a different group
group-2example result

Send messages
New Terminal and Send some messages with keys (Send messages in the format
key:payload- e.g.my-key:my-value)example result (send message such as "a:apple", "b:bird")

example result after send message

Rebalancing consumer group
Kill one of the consumers group 1
example kill consumer group-1 terminal #2

Send some messages with the same key as was used before for this consumer
Notice that one of the other consumers got the partition assigned and will receive it
example result

Message replay
kill all previous consumer and producer
Consume the messages from Kafka with a new group:
example result

After it consumes all messages, try to restart it to make sure they were all committed - no messages should be received
example result

kill all consumer (with ctrl+c)
List all the groups:
example result

Or describe them:
example result (view current-offset, log-end-offset, lag)

Go and reset the offset to 0: with --to-earliest (other option such as set to last offset with --to-latest)
example result

Try to consume the messages again - you should receive them from the beginning of the topic:
example result

Secure Client to Broker Cluster
Configuration
Look in the configuration files of the brokers (
./configs/kafka/) such as server-0.properties and check the fields related to the TLS & SASL.example configuration
SSL Consumers and Producers
Use SSL to producer messages:
And consume them:
SASL Consumers and Producers
Check the sasl-client.properties file which configures SASL PLAIN authentication
Try to producer some messages:
And consume them:
Topics & Partition Information
check demo topic already in cluster
example result
We already used a lot of commands. You can also use the script to show only some topics in troubles:
'--under-replicated-partitions' --> displays the number of partitions that do not have enough replicas to meet the desired replication factor.
'--unavailable-partitions' --> list partitions that currently don't have a leader and hence cannot be used by Consumers or Producers.
Consumer & Producer API
Create new topic for test messages:
Start a console consumer to see the sent messages:
Basic Consumer and Producer APIs
Review Simple Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/basic folder
run simple producer client api
example result in kafka console consumer

example result in ProducerAPI java application

Review Simple Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/basic folder
after producer client complete, run simple consumer client api
example result

Acks and Manual Commits with API
Review Acknowledge Config Producer Client (ACKS_CONFIG property set to "all") Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/acks folder
run producer with acks = "all"
example result

Review Manual Commit Config Consumer Client (default auto commit) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/manaulcommits folder
see ENABLE_AUTO_COMMIT_CONFIG property
see consumer commit method (sync & async)
run consumer with async commit
example result

SSL and SSL Authentication with API
Review SSL Config Producer Client (SSL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/ssl folder
run producer with ssl
example result

Review SSL Config Consumer Client (SSL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/ssl folder
run consumer with ssl
example result

SASL with API
Review SASL Config Producer Client (SASL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/sasl folder
run producer with sasl
example result

Review SASL Config Consumer Client (SASL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/sasl folder
run consumer with sasl
example result

Serialization and deserialization with API
Review Serialization Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/serialization folder
run producer with serialization
example result

Review DeSerialization Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/serialization folder
run consumer with serialization
example result

Last updated