Authentication & Authorization

Prerequisite

  • Stop all server in previous lab

    • type ctrl+c in each terminal (stop kafka before stop zookeeper)

    • check kafka broker and zookeeper process with jps command

      jps
  • clear old data in previous lab

    rm -rf /tmp/zookeep*
    rm -rf /tmp/kaf*

Create Java Authentication and Authorization Service (JAAS) for Kafa Authentication

  • Configure the broker with its user credentials and authorize the client's user credentials. These credentials along with the login module specification, are stored in a JAAS login configuration file , config JAAS at jaas.conf

    KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin"
      user_admin="admin"
      user_alice="alice"
      user_bob="bob"
      user_charlie="charlie";
    };  

    This example defines the following for the KafkaServer entity:

    • The custom login module that is used for user authentication,

    • admin/admin is the username and password for inter-broker communication (i.e. the credentials the broker uses to connect to other brokers in the cluster),

    • admin/admin, alice/alice, bob/bob, and charlie/charlie as client user credentials. Note that the valid username and password is provided in this format: user_username="password". If the line user_admin="admin" is removed from this file, the broker is not able to authenticate and authorize an admin user. Only the admin user can to connect to other brokers in this case.

  • Pass in this file as a JVM configuration option when running the broker, using -Djava.security.auth.login.config=[path_to_jaas_file]. [path_to_jaas_file] can be something like: config/jaas-kafka-server.conf. This can be done by setting the KAFKA_OPTS environment variable, for example:

    export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/jaas-kafka-server.conf"
    • run below command to add jaas to "KAFKA_OPTS" environment variable

    cd ~/amq-streams-2022/5-basic-acl
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/student/amq-streams-2022/5-basic-acl/kafka/config/jaas.conf"

Set Kafka Broker Authentication

  • Define the accepted protocol and the ACL authorizer used by the broker by adding the following configuration to the broker properties file server.properties

    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    listeners=SASL_PLAINTEXT://:9092
    security.inter.broker.protocol= SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
  • The other configuration that can be added is for Kafka super users: users with full access to all APIs. This configuration reduces the overhead of defining per-API ACLs for the user who is meant to have full API access. From our list of users, let's make admin a super user with the following configuration:

    super.users=User:admin
  • When the broker runs with this security configuration, only authenticated and authorized clients are able to connect to and use it.

  • review server.properties in folder 5-basic-acl/kafka/bin/config/server.properties

Try Authentication to Kafka

  • start zookeeper, open new terminal and run below command to start zookeeper

    cd ~/amq-streams-2022/5-basic-acl
    ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
  • start kafka, open new terminal and run below command to start kafka

    cd ~/amq-streams-2022/5-basic-acl
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/student/amq-streams-2022/5-basic-acl/kafka/config/jaas.conf"
    ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
  • try to get list of topic, open new terminal and run below command

    cd ~/amq-streams-2022/5-basic-acl
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

    example result, So far, the broker is configured for authenticated access. Running a Kafka console producer or consumer not configured for authenticated and authorized access fails with messages like the following

    Error while executing topic command : Timed out waiting for a node assignment. Call: listTopics
    [2022-11-30 15:41:47,753] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listTopics
    (kafka.admin.TopicCommand$)

    error result in kafka console

    [2022-11-30 15:41:46,878] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
    [2022-11-30 15:41:47,281] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
    [2022-11-30 15:41:47,684] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

Create Client-Site Setting

  • Specify the broker protocol as well as the credentials to use on the client side. The following configuration is placed inside the corresponding configuration file (admin.properties) provided to the particular client, review following properties in kafka/config folder

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice";
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="bob" password="bob";
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="charlie" password="charlie";
  • Try list topic again with admin user

    cd ~/amq-streams-2022/5-basic-acl
    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config kafka/config/admin.properties

    Verify that it can be called without error.

  • create topic "test"

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1 --command-config kafka/config/admin.properties
  • List the topics.

    ./kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config kafka/config/admin.properties
  • Try to put data to topic "test" with user:alice

    cd ~/amq-streams-2022/5-basic-acl
    ./kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

    example result, ctrl+c to exit

    [2022-12-02 11:33:28,628] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
    [2022-12-02 11:33:29,036] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
    [2022-12-02 11:33:29,439] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

    error result in kafka console

    [2022-11-30 15:41:46,878] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
    [2022-11-30 15:41:47,281] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
    [2022-11-30 15:41:47,684] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
  • Test Again with alice user

    ./kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config kafka/config/alice.properties

    example result

    >

    try to send message such as "message1"

    >message1

    example result, Authorization Exception will be throw to your console.

    >message1
    [2022-12-02 11:39:04,065] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
    [2022-12-02 11:39:04,068] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
    [2022-12-02 11:39:04,069] ERROR Error when sending message to topic test with key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
    >

    exit from producer console by type ctrl+c.

Adding ACL rules

  • Check current access control list

    cd ~/amq-streams-2022/5-basic-acl
    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --command-config kafka/config/admin.properties 

    example result, no acl

  • add authorized to alice user

    requirement are:

    Principal alice is Allowed Operation Write From Host * On Topic test.
    Principal alice is Allowed Operation Create From Host * On Topic test.

    run command to add authorized

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --add --allow-principal User:alice --operation Write --operation Create --topic test

    example result

    Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
          (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
          (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
            (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
            (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW) 
  • Test Again with alice user

    ./kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config kafka/config/alice.properties

    example result, try to send message to topic "test"

    >message1
    >message2
    >message3
    >
  • exit from kafka console producer and check current acl

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --command-config kafka/config/admin.properties 

    example result

    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
          (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
          (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Add acl for consume topic

  • test consume message with user: bob

    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config kafka/config/bob.properties --from-beginning

    example result, error is

    [2022-12-02 12:03:00,033] WARN [Consumer clientId=console-consumer, groupId=bob-group] Error while fetching metadata with correlation id 2 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
    [2022-12-02 12:03:00,034] ERROR [Consumer clientId=console-consumer, groupId=bob-group] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
    [2022-12-02 12:03:00,035] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
    Processed a total of 0 messages
  • add read authorized

    requirement are:

    Principal bob is Allowed Operation Read From Host * On Topic test.

    run command to add authorized

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --add --allow-principal User:bob --operation Read --topic test

    example result

    Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
          (principal=User:bob, host=*, operation=READ, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
            (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
            (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
            (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
  • add authorized for committing offsets to group bob-group

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --add --allow-principal User:bob --operation Read --group bob-group

    example result

    Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`: 
          (principal=User:bob, host=*, operation=READ, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`: 
            (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
  • test consume message again

    ./kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config kafka/config/bob.properties --from-beginning

    example result

    message1
    message2
    message3
  • exit from kafka console and check current acl

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --command-config kafka/config/admin.properties 

Add authorized for Describe Consumer Group

  • add authorized for describe consumer group to user: charlie

    requirement are

    Principal charlie is Allowed Operation Describe From Host * On Group bob-group.
    Principal charlie is Allowed Operation Describe From Host * On Topic test.

    run command to add authorized to read group bob-group

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --add --allow-principal User:charlie --operation Read --group bob-group

    example result

    Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`: 
          (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`: 
            (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
            (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW)

    run command to add authorized to describe topic test

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --add --allow-principal User:charlie --operation Describe --topic test

    example result

    Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
          (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
            (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
            (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
            (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW)
            (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
  • test describe consumer group

    ./kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config kafka/config/charlie.properties

    example result

    Consumer group 'bob-group' has no active members.
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    bob-group       test            0          3               3               0               -               -               -

List current ACL

  • run command to check current ACL in this kafka cluster

    ./kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config kafka/config/admin.properties --list

    example result

    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
          (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
          (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
          (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW)
          (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW) 
    
    Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`: 
            (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
            (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW) 

Last updated