Deploying a scalable Kafka Architecture with an Sql database on Kubernetes Cluster

Deploying a scalable Kafka Architecture with an Sql database on Kubernetes Cluster

While at work, monitoring data streams became a hassle and more tedious as time goes then we opted to use real time data streaming service. and to achieve this goal we used apache Kafka as the distributed data streaming services which is fully managed , eliminates the need for us to handle infrastructure management, scaling, or maintenance tasks.

https://github.com/A-LPHARM/K8s-apache-kafka-strimzi

Kafka is a stream processing system used for messaging, website activity tracking, metrics collection and monitoring, logging, event sourcing, commit logs, and real-time analytics. It’s a good fit for large scale message processing applications since it is more robust, reliable, and fault-tolerant compared to traditional message queues.

Open-source Kafka or managed distributions are ubiquitous in modern software development environments. Kafka is used by developers and data engineers at companies such as Uber, Square, Strave, Shopify, and Spotify.

What is strimzi?

Strimzi simplifies the process in deploying and managing Apache Kafka in a Kubernetes cluster. As a Cloud Native Computing Foundation Sandbox project, it provides container images and Operators for running Kafka on Kubernetes environment.

Strimzi Operators are fundamental to this project. These Operators are built with purpose with special operational knowledge to effectively manage Kafka. The Operators are involved in Deploying and running Kafka clusters and components, Configuring and securing access to Kafka, Upgrading and managing Kafka and even taking care of managing topics and users

Summary of Kafka Components:

1. Broker:
 -  Orchestrates the storage and transmission of messages.
 -  Uses Apache ZooKeeper for storing configuration data and cluster coordination.

2. ZooKeeper Cluster:
 -  Consists of replicated ZooKeeper instances.
 -  Used by brokers for configuration storage and cluster coordination.

3. Topic:
 -  Provides a destination for storing data.
 -  Split into one or more partitions.

4. Kafka Cluster:
 -  Group of broker instances working together.

5. Partition:
 -  Splits a single topic log into multiple logs.
 -  Each partition can reside on a separate node in the Kafka cluster.

6. Kafka Connect Cluster:
 -  Facilitates external data connections.

7. Kafka MirrorMaker Cluster:
 -  Replicates data between two Kafka clusters.
 -  Operates within or across data centers.

8. Kafka Exporter:
 -  Extracts additional Kafka metrics data for monitoring.
 -  Provides metrics related to offsets, consumer groups, consumer lag, and topics.

9. Kafka Bridge:
 -  Enables HTTP-based requests to the Kafka cluster.

10. Use Cases of Kafka:
 -  Kafka is commonly used for real-time event streaming.
 -  It serves as a distributed messaging system for large-scale data processing.
 -  Kafka supports log aggregation and data integration in various applications.

11. Kafka Uses:
 -  Data streaming and real-time analytics.
 -  Messaging and communication between microservices.
 -  Handling large-scale data processing and analytics.

Pre-requisites:

  • Docker

  • Kubernetes

  • kubectl command

  • AWS cloud services

https://github.com/A-LPHARM/K8s-apache-kafka-strimzi

Step 1: Download and Extract Debezium MySQL Connector Archive

Begin by downloading and extracting the Debezium MySQL Connector archive using the following command:

sudo curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.0.0.Final/debezium-connector-mysql-1.0.0.Final-plugin.tar.gz | tar xvz

Next, build your Docker image for Kafka Connect and the connector using the provided Dockerfile:

FROM strimzi/kafka:0.20.1-kafka-2.5.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

Create Kafka Connector Using Docker Image

Use the Docker image to create your Kafka connector. Additionally, prepare a Dockerfile that incorporates the connector files into the Strimzi Kafka Connect image.

docker build . -t henriksin1/connect-debezium
docker push henriksin1/connect-debezium

Alternatively, utilize the specified Docker repository for the push operation.

deploy your kubenetes cluster using either EKS or Kops

eksctl create cluster — name test-cluster — version 1.21 — region us-east-1 — nodegroup-name linux-node — node-type t2.medium— nodes 2

or

kops create cluster — name henry-new-kops.k8s.local  \
 — cloud=aws — networking calico \
 — zones us-east-1a,us-east-1b,us-east-1c \
 — master-size t2.medium — node-size t2.medium \
 — node-count=3 — master-count=1
kops update cluster — name henry-new-kops.k8s.local — yes — admin

STEP 2

create namespace for all the deployments

kubectl apply -f 0-namespace.yaml

STEP 3

deploy strimzi operators from the operator lifecycle manager this simplifies the process to monitor the Kafka clusters

each operator manages Kafka and performs a separate function.

this installation deploys the custom resources definition CRD is an extension of the Kubernetes API that is not necessarily available in a default Kubernetes installation. It represents a customization of a particular Kubernetes installation.

basically we are customizing our resources for the operators to run effortlessly

to obtain the latest releases

curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.26.0/install.sh -o install.sh
chmod +x install.sh
./install.sh v0.26.0

then you run

kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

step 4

deploy your secrets file this allows the connectors and Kafka have access into the data base and run the topics

cat <<EOF> debezium-mysql-credentials.properties 
mysql_username: debezium 
mysql_password: dbz 
EOF
kubectl -n kafka create secret generic debezium-secret — from-file=secrets.properties

step 5

deploy the role-back access control

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: kafka-connector-configuration-role
 namespace: kafka
rules:
- apiGroups: [“”]
  resources: [“secrets”]
  resourceNames: [“debezium-secret”]
  verbs: [“get”]

kubectl apply -f rbac-debezium-role.yaml

step 6

deploy the service account to combine with the RBAC and secrets

apiVersion: v1
kind: ServiceAccount
metadata:
  name: debezium-connect-cluster-connect
  namespace: kafka

kubectl apply -f serviceaccount.yaml

step 7

deploy the rbac cluster binding resources which binds the serviceaccount to the role

https://github.com/A-LPHARM/K8s-apache-kafka-strimzi/blob/main/4-rbac-cluster-binding.yaml

kubectl apply -f rbac-cluster-binding.yaml

step 8

deploy the kafka cluster we are using the

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      inter.broker.protocol.version: '3.6'
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

kubectl apply -f kafka-kluster.yaml

the kafka cluster will deploy all the topic operators and user operators and zookeepers

note confirm the cluster is running

kubectl wait kafka/my-cluster — for=condition=Ready — timeout=300s -n kafka

then you execute commands within the Kafka cluster to create and inspect topics, checking if they are consumed.

To produce a topic

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

open another terminal to verify whether the topics are being consumed

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

Step 9:

After successfully deploying your Kafka cluster, proceed to deploy the MySQL database cluster. Once the database is deployed, apply the configuration from the ‘6-sql.yaml’ file using the following command:

https://github.com/A-LPHARM/K8s-apache-kafka-strimzi/blob/main/6-sql.yaml

kubectl apply -f 6-sql.yaml

To ensure the database is running and determine the endpoint port, execute:

kubectl describe service mysql -n kafka

Then, in another terminal, we can run the command line client:

exec into the database

kubectl exec -it mysql-6597659cb8-j9rk4 -n kafka -- sh
mysql -h mysql -u root -p
password:
mysql> use inventory;
mysql> show tables;

Utilize the obtained information to connect with your local MySQL Workbench.

once it confirms successful you prepare your kafka-connect manifest file

Step 10

Deploying the debezium-connect wasnt easy after so many logging and troubleshooting to sort out the networking. Deploying a self-managed service does require more effort in terms of setting up and maintaining the cluster

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.6.0
  image: henriksin1/connect-debezium:v1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    # config.providers: secret
    # config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: connector-config
        secret:
          secretName: debezium-secret
kubectl apply -f 7-kafka-connect.yaml

then you confirm all the pods are running

kubectl get all -n kafka

then run

kubectl logs debezium-connect-cluster-connect-0 -n kafka

check if the logs are good

note: this explains the configurations in the Kafka connect

  • the strimzi.io/use-connector-resources: “true” annotation tells the cluster operator that Kafka connector resources will be used to configure connectors within this Kafka Connect cluster.
  • The spec.image this image has to be built in your docker and deployed

  • the config.storage and config settings will use 1 as its replication factor because we created a single-broker Kafka cluster, more replicas we deploy more storage and config setting.

  • In the external configuration we’re referencing the secret we just created.

this diagram explains the movement of data from the database into the sink connector inside the kafka connect and how topics is sent to the kafka connector through the kafka connect API

then

Step 11.

create your kafka connector file

apiVersion: "kafka.strimzi.io/v1beta2"
kind: "KafkaConnector"
metadata:
  name: "debezium-connector-mysql"
  namespace: kafka
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: 52.90.119.28 #1
    database.port: 30036 #2
    database.user: root  #3
    database.password: debezium  #4
    database.server.id: "184054"
    database.whitelist: "inventory"
    database.server.name: "mysql"   #5
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "schema-changes.inventory"
    include.schema.changes: "true"
    database.allowPublicKeyRetrieval: "true"

In metadata.labels, strimzi.io/cluster names the kafkaconnect cluster which this connector will be created in.

The spec.class names the Debezium MySQL connector and spec.tasks must be 1 because that’s all this connector ever uses.

The config object contains the rest of the connector configuration. The explains

  1. the database, hostname uses the ip of the mysql server as IP address for connecting to MySQL

2. The database port uses the ports which is gotten from the mysql database configuration which makes me use the node port

3. the file used for the database.user and database.passwrodis a placeholder which gets replaced with the referenced property from the given file in the secret we created.

4. the database.history.kafka: schema-changes.inventor configured debezium to use the schema-changes.inventory topic to store the database schema history

kubectl apply -f debezium-connector.yaml

the run

kubectl describe kafkaconnector debezium-connector-mysql -n kafka

once it shows you have connection you can run your topics

TIME FOR WORK

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

this gives you the consumer results for every topic that’s given from the data base

return to the terminal having the sql data base thats is on hold

then input this command

mysql> SELECT * FROM customers;

in another terminal run

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql --from-beginning