Consumer & Producer
Last updated
Last updated
make sure to stop all kafka & zookeeper server
by ctrl+c in your terminal or
call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper
make sure with jps command
clear zookeeper & kafka data
Open 3 consumers using the same group group-1
example result
New Terminal and Open consumer using a different group group-2
example result
New Terminal and Send some messages with keys (Send messages in the format key:payload
- e.g. my-key:my-value
)
example result (send message such as "a:apple", "b:bird")
example result after send message
Kill one of the consumers group 1
example kill consumer group-1 terminal #2
Send some messages with the same key as was used before for this consumer
Notice that one of the other consumers got the partition assigned and will receive it
example result
kill all previous consumer and producer
Consume the messages from Kafka with a new group:
example result
After it consumes all messages, try to restart it to make sure they were all committed - no messages should be received
example result
kill all consumer (with ctrl+c)
List all the groups:
example result
Or describe them:
example result (view current-offset, log-end-offset, lag)
Go and reset the offset to 0: with --to-earliest (other option such as set to last offset with --to-latest)
example result
Try to consume the messages again - you should receive them from the beginning of the topic:
example result
example configuration
Use SSL to producer messages:
And consume them:
Try to producer some messages:
And consume them:
check demo topic already in cluster
example result
We already used a lot of commands. You can also use the script to show only some topics in troubles:
'--under-replicated-partitions' --> displays the number of partitions that do not have enough replicas to meet the desired replication factor.
'--unavailable-partitions' --> list partitions that currently don't have a leader and hence cannot be used by Consumers or Producers.
Create new topic for test messages:
Start a console consumer to see the sent messages:
Basic Consumer and Producer APIs
run simple producer client api
example result in kafka console consumer
example result in ProducerAPI java application
after producer client complete, run simple consumer client api
example result
Acks and Manual Commits with API
run producer with acks = "all"
example result
see ENABLE_AUTO_COMMIT_CONFIG property
see consumer commit method (sync & async)
run consumer with async commit
example result
SSL and SSL Authentication with API
run producer with ssl
example result
run consumer with ssl
example result
SASL with API
run producer with sasl
example result
run consumer with sasl
example result
Serialization and deserialization with API
run producer with serialization
example result
run consumer with serialization
example result
start the Zookeeper and Kafka cluster from the . This cluster will be used for this lab. run cli in each terminal (1 shell script 1 terminal)
Look in the configuration files of the brokers (./configs/kafka/
) such as and check the fields related to the TLS & SASL.
Check the file which configures SASL PLAIN authentication
Review Simple Producer Client Source Code at in src/main/java/cz/scholz/kafka/basic folder
Review Simple Consumer Client Source Code at in src/main/java/cz/scholz/kafka/basic folder
Review Acknowledge Config Producer Client (ACKS_CONFIG property set to "all") Source Code at in src/main/java/cz/scholz/kafka/acks folder
Review Manual Commit Config Consumer Client (default auto commit) Source Code at in src/main/java/cz/scholz/kafka/manaulcommits folder
Review SSL Config Producer Client (SSL* property) Source Code at in src/main/java/cz/scholz/kafka/ssl folder
Review SSL Config Consumer Client (SSL* property) Source Code at in src/main/java/cz/scholz/kafka/ssl folder
Review SASL Config Producer Client (SASL* property) Source Code at in src/main/java/cz/scholz/kafka/sasl folder
Review SASL Config Consumer Client (SASL* property) Source Code at in src/main/java/cz/scholz/kafka/sasl folder
Review Serialization Producer Client Source Code at in src/main/java/cz/scholz/kafka/serialization folder
Review DeSerialization Consumer Client Source Code at in src/main/java/cz/scholz/kafka/serialization folder