Consumer & Producer

Prerequisite

Start Cluster

  • make sure to stop all kafka & zookeeper server

    • by ctrl+c in your terminal or

    • call kafka-server-stop.sh for kafka broker and call zookeeper-server-stop.sh for zookeeper

    • make sure with jps command

  • clear zookeeper & kafka data

  • start the Zookeeper and Kafka cluster from the Red Hat AMQ Streams Architecture. This cluster will be used for this lab. run cli in each terminal (1 shell script 1 terminal)

Consumer Groups

Create a new topic (delete and create new again)

Setup consumers

  • Open 3 consumers using the same group group-1

    example result

  • New Terminal and Open consumer using a different group group-2

    example result

Send messages

  • New Terminal and Send some messages with keys (Send messages in the format key:payload - e.g. my-key:my-value)

    example result (send message such as "a:apple", "b:bird")

    example result after send message

Rebalancing consumer group

  • Kill one of the consumers group 1

    example kill consumer group-1 terminal #2

  • Send some messages with the same key as was used before for this consumer

  • Notice that one of the other consumers got the partition assigned and will receive it

    example result

Message replay

  • kill all previous consumer and producer

  • Consume the messages from Kafka with a new group:

    example result

  • After it consumes all messages, try to restart it to make sure they were all committed - no messages should be received

    example result

  • kill all consumer (with ctrl+c)

  • List all the groups:

    example result

  • Or describe them:

    example result (view current-offset, log-end-offset, lag)

  • Go and reset the offset to 0: with --to-earliest (other option such as set to last offset with --to-latest)

    example result

  • Try to consume the messages again - you should receive them from the beginning of the topic:

    example result

Secure Client to Broker Cluster

Configuration

  • Look in the configuration files of the brokers (./configs/kafka/) such as server-0.properties and check the fields related to the TLS & SASL.

    • example configuration

SSL Consumers and Producers

  • Use SSL to producer messages:

  • And consume them:

SASL Consumers and Producers

  • Check the sasl-client.properties file which configures SASL PLAIN authentication

  • Try to producer some messages:

  • And consume them:

Topics & Partition Information

  • check demo topic already in cluster

    example result

  • We already used a lot of commands. You can also use the script to show only some topics in troubles:

    • '--under-replicated-partitions' --> displays the number of partitions that do not have enough replicas to meet the desired replication factor.

    • '--unavailable-partitions' --> list partitions that currently don't have a leader and hence cannot be used by Consumers or Producers.

Consumer & Producer API

  • Create new topic for test messages:

  • Start a console consumer to see the sent messages:

  • Basic Consumer and Producer APIs

    • Review Simple Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/basic folder

    • run simple producer client api

      example result in kafka console consumer

      example result in ProducerAPI java application

    • Review Simple Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/basic folder

    • after producer client complete, run simple consumer client api

      example result

  • Acks and Manual Commits with API

    • Review Acknowledge Config Producer Client (ACKS_CONFIG property set to "all") Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/acks folder

    • run producer with acks = "all"

      example result

    • Review Manual Commit Config Consumer Client (default auto commit) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/manaulcommits folder

      • see ENABLE_AUTO_COMMIT_CONFIG property

      • see consumer commit method (sync & async)

    • run consumer with async commit

      example result

  • SSL and SSL Authentication with API

    • Review SSL Config Producer Client (SSL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/ssl folder

    • run producer with ssl

      example result

    • Review SSL Config Consumer Client (SSL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/ssl folder

    • run consumer with ssl

      example result

  • SASL with API

    • Review SASL Config Producer Client (SASL* property) Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/sasl folder

    • run producer with sasl

      example result

    • Review SASL Config Consumer Client (SASL* property) Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/sasl folder

    • run consumer with sasl

      example result

  • Serialization and deserialization with API

    • Review Serialization Producer Client Source Code at ProducerAPI.java in src/main/java/cz/scholz/kafka/serialization folder

    • run producer with serialization

      example result

    • Review DeSerialization Consumer Client Source Code at ConsumerAPI.java in src/main/java/cz/scholz/kafka/serialization folder

      • run consumer with serialization

      example result

Last updated