How to connect to an SSL-secured Kafka cluster (Strimzi example)#
Kafkit’s kafkit.ssl
module can help you connect to Kafka brokers that require your aiokafka-based clients to connect with the SSL protocol.
SSL is commonly used to mutually authenticate the client and Kafka brokers: the broker authenticates the client, and the client authenticates the broker.
SSL authentication is also commonly used in conjunction with Kafka’s ACL-based authorization system, which ensures that specific clients can only perform a specific set of operations.
This page describes how to use kafkit.ssl
to help connect your aiokafka client for the specific case of a Strimzi-based Kafka cluster.
Strimzi makes it convenient to deploy secured Kafka clusters in Kubernetes.
The basic ideas in this tutorial, however, apply to any SSL-secured Kafka cluster.
Gathering the SSL client and broker certificates#
In a Strimzi-based deployment, the Kafka broker’s SSL certificate and the client’s SSL certificates are in separate Kubernetes Secret
resources.
The Kafka broker’s CA certificate is in a secret named
{clustername}-cluster-ca-cert
, where{clustername}
matches the name of the StrimziKafka
resource.The client certificates are in a secret named
{clientname}
, where{clientname}
matches the name of the client’s StrimziKafkaUser
resource.
In your client’s Kubernetes Deployment
resource, you can mount these Secrets as files in your pod’s filesystem (extraneous Deployment
fields omitted):
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: myapp
volumeMounts:
- name: "client-ssl"
mountPath: "/var/strimzi-client"
readOnly: True
- name: "broker-ssl"
mountPath: "/var/strimzi-broker"
readOnly: True
volumes:
# Mount the TLS secret created by KafkaUser
- name: "client-tls"
secret:
# matches name of KafkaUser
secretName: kafkauser-myapp
- name: "broker-tls"
secret:
# matches name of Strimzi cluster CA cert secret
secretName: "events-cluster-ca-cert"
In your Python application code, you can create paths to the individual certificates and key files:
from pathlib import Path
broker_ca_path = Path("/var/strimzi-broker/ca.crt")
client_cert_path = Path("/var/strimzi-client/user.crt")
client_key_path = Path("/var/strimzi-client/user.key")
Create the SSLContext#
Both aiokafka.AIOKafkaConsumer
and aiokafka.AIOKafkaProducer
use an SSL context (ssl.SSLContext
) to support SSL communication with the Kafka brokers.
The kafkit.ssl.create_ssl_context
function lets you create an SSLContext
instance with your certificates and keys:
from kafkit.ssl import create_ssl_context
ssl_context = create_ssl_context(
cluster_ca_path=broker_ca_path,
client_cert_path=client_cert_path,
client_key_path=client_key_path,
)
Using the SSLContext#
Finally you can use that ssl_context
as the ssl_context
argument to aiokafka.AIOKafkaProducer
or aiokafka.AIOKafkaConsumer
:
import asyncio
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers="kafka:9093",
ssl_context=ssl_context,
security_protocol="SSL",
)
await producer.start()
...
await producer.stop()