Kafka
Apache Kafka with Ondat
Kafka is a popular stream processing platform combining features from pub/sub and traditional queues.
Using Ondat persistent volumes with Apache Kafka means that if a pod fails, the cluster is only in a degraded state for as long as it takes Kubernetes to restart the pod. When the pod comes back up, the pod data is immediately available. Should Kubernetes schedule the kafka pod on a new node, Ondat allows for the data to be available to the pod, irrespective of whether or not the original Ondat master volume is located on the same node.
Kafka has features to allow it to handle replication, and as such careful consideration of whether to allow Ondat or Kafka to handle replication is required.
Before you start, ensure you have Ondat installed and ready on a Kubernetes cluster. See our guide on how to install Ondat on Kubernetes for more information.
Prerequisites
- Apache Zookeeper is required by Kafka to function; we assume it to already
exist and be accessible within the Kubernetes cluster as
zookeeper
, see how to run Zookeeper with Ondat here - Ondat is assumed to have been installed; please check for the latest available version here
- Kafka pods require 1536 MB of memory for successful scheduling
Helm
To simplify the deployment of kafka, we’ve used this
Kafka helm chart (incubator)
(version 0.13.8
, app version 5.0.1
) and rendered it into the
example deployment files you can find in our GitHub
repo.
Clone the use cases repo
You can find the latest files in the Ondat use cases repository
in /kafka/
git clone https://github.com/storageos/use-cases.git storageos-usecases
cd storageos-usecases
StatefulSet definition
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka
...
spec:
serviceName: kafka-headless
podManagementPolicy: OrderedReady
updateStrategy:
type: OnDelete
replicas: 3 # <--- number of kafa pods to run
template:
...
spec:
serviceAccountName: kafka
containers:
...
- name: kafka-broker
image: "confluentinc/cp-kafka:5.0.1"
imagePullPolicy: "IfNotPresent"
...
volumeMounts:
- name: datadir
mountPath: "/var/data"
volumes:
- name: jmx-config
configMap:
name: kafka-metrics
terminationGracePeriodSeconds: 60
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi # <--- storage requested for each pod
storageClassName: "fast" # <--- the StorageClass to use
This excerpt is from the StatefulSet definition (10-statefulset.yaml
). The
file contains the PersistentVolumeClaim template that will dynamically
provision the necessary storage, using the Ondat storage class.
Dynamic provisioning occurs as a volumeMount has been declared with the same name as a VolumeClaimTemplate.
-
Create the kubernetes objects
kubectl apply -f ./kafka/
-
Confirm kafka is up and running
$ kubectl get pods -l app=kafka NAME READY STATUS RESTARTS AGE kafka-0 2/2 Running 0 10m kafka-1 2/2 Running 0 9m26s kafka-2 2/2 Running 0 7m59s
-
Connect to kafka
Connect to the kafka test client pod and send some test data to kafka through its service endpoint
-
Connect to the pod
kubectl exec -it kafka-test-client /bin/bash
-
Create a topic
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1
-
Send some test data
/usr/bin/kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic test-rep-one --num-records 5000 --record-size 100 --throughput -1 --print-metrics --producer-props acks=1 bootstrap.servers=kafka:9092 buffer.memory=67108864 batch.size=8196