Kafka Internals: KIP-651 Using PEM certificate for SSL listener

Thanh Tung Dao
3 min readNov 5, 2022
source: https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/

1.Motivation

  • Previously, when we connect to TLS listener in Kafka, we have to prepare the keystore and truststore in JKS format. JKS stands for Java Keystore which means it used explicitly for Java world. However, the rest of the world have adopted a common format i.e PEM
  • Recently, KIP-651 proposes additional support to use PEM format for SSL certificates. However, the documentation from Kafka is not cleared and this blog aims to fill that gap

2.How-to

Lets jump straight into how to do it. There are two ways to provide the certificates: as string or as file. This blog will demonstrate using kafka console client for simplicity

2.1 Provide certifcate as files

This will be your client.properties file

bootstrap.servers=kafka.server:9092
security.protocol=SSL
ssl.truststore.type=PEM
ssl.truststore.location=truststore.pem
ssl.keystore.type=PEM
ssl.keystore.location=keystore.pem
ssl.key.password=hello

Detail steps:

Step 1: You must encrypt your private key in PKCS8 format and only your private key. The command for it is: ( please remember to use hello as your encrypted password in this example)

openssl pkcs8 -topk8 -in privateKey.key -out encryptedPrivateKey.p9

Step 2: Copy the content of your encrypted private key and put into the keystore.pem file in this exact order:

  • your private key
  • your signed certificate
  • any intermediate CA and root CA

The output is something like this

-----BEGIN ENCRYPTED PRIVATE KEY-----
xxxx
-----END ENCRYPTED PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
xxxx
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
xxxx
-----END CERTIFICATE-----

Step 3: Run the command

./bin/kafka-console-consumer.sh --bootstrap-server your.endpoint:9092 --topic your.topic --producer.config client.properties

Common mistake I made:

  • If you don’t put ssl.keys.password it won’t work too
org.apache.kafka.common.KafkaException: Failed to construct kafka producer  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)  at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)  at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: SSL PEM key store is specified, but key password is not specified.
  • if you don’t encrypt your private key, this error will pop up
org.apache.kafka.common.KafkaException: Failed to construct kafka producer  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)  at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)  at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Failed to load PEM SSL keystore wong-format.pem Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid PEM keystore configs Caused by: java.io.IOException: overrun, bytes = xxx

2.2 Providing certificates as strings

This will be the content of your client.properties file

bootstrap.servers=kafka.server:9092
security.protocol=SSL
ssl.truststore.type=PEM
ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----
ssl.keystore.type=PEM
ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\nxxxx\n-----END CERTIFICATE-----
ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY-----\nxxxx\n-----END ENCRYPTED PRIVATE KEY-----
ssl.key.password=hello

Step 1: You must encrypt your private key in PKCS8 format and only your private key. The command for it is: ( please remember to use hello as your encrypted password in this example)

openssl pkcs8 -topk8 -in privateKey.key -out encryptedPrivateKey.p9

Step 2: Copy the content of your encrypted private key and put into the ssl.keystore.key section of the client.properties file. Remember that everything must be on a single line

Step 3: Run the command

./bin/kafka-console-consumer.sh --bootstrap-server your.endpoint:9092 --topic your.topic --producer.config client.properties

--

--