ECE 473/573 Fall 2024 - Project 5

Distributed Message Queues



Report Due: 11/24 (Sun.), by the end of the day (Chicago time)
Late submissions will NOT be graded


I. Objective

In this project, you will learn the basics of Apache Kafka, a popular open-source distributed event streaming platform. We'll continue to use Kubernetes (K8s) and kind to manage our Kafka cluster so you will also be able to learn more about K8s.


II. Kafka Services

(Skip for offline version) Please create a new VM by importing our VM Appliance, make a clone of the repository (or fork it first) https://github.com/wngjia/ece573-prj05.git, and execute setup_vm.sh to setup the VM as needed. Since the setup_vm.sh is the same as that of Project 4, you may choose to reuse the VM for Project 4 this time.

Same as Project 4, the cluster.yml script defines a cluster of 5 nodes for kind. Use reset_cluster.sh to pull the necessary images, delete any existing kind cluster, create a new one with the cluster.yml script, and load images into nodes. It may take sometime to complete so please be patient.

ubuntu@ece573:~/ece573-prj05$ ./reset_cluster.sh
...
Deleting cluster "kind" ...
Deleted nodes: ["kind-worker" "kind-control-plane" "kind-worker4" "kind-worker3" "kind-worker2"]
Creating cluster "kind" ...
 ✓ Ensuring node image (kindest/node:v1.30.0) 🖼
...
Image: "confluentinc/cp-zookeeper:7.3.5" with ID "sha256:2651d76ba7aef97794f02e75bc5ba14b38d2b6dc373e49a365dab3a01ed3f652" not yet present on node "kind-worker3", loading...
...
It will take sometime to complete and then you can verify that the cluster is running by kind get nodes
ubuntu@ece573:~/ece573-prj05$ kind get nodes
kind-worker3
kind-worker2
kind-worker4
kind-worker
kind-control-plane

Next, we will deploy Kafka to the K8s cluster. Unlike Cassandra where the states of multiple nodes are managed by the Cassandra programs themselves, Kafka manages its node states via Apache ZooKeeper that provides distributed coordination. Therefore, the K8s configuration file kafka.yml includes two pairs of Services and StatefulSets, one as zookeeper-service and the other as kafka-service. For simplicity, zookeeper-service only uses 1 Pod of ZooKeeper replica while kafka-service uses 3 Pods of Kafka replicas. Start the services by kubectl apply

ubuntu@ece573:~/ece573-prj05$ kubectl apply -f kafka.yml 
service/zookeeper-service created
service/kafka-service created
statefulset.apps/zookeeper created
statefulset.apps/kafka created
Verify the status by kubectl get
ubuntu@ece573:~/ece573-prj05$ kubectl get services
NAME                TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)    AGE
kafka-service       ClusterIP   None                 9092/TCP   46s
kubernetes          ClusterIP   10.96.0.1            443/TCP    9m17s
zookeeper-service   ClusterIP   None                 2181/TCP   46s
ubuntu@ece573:~/ece573-prj05$ kubectl get statefulsets
NAME        READY   AGE
kafka       0/3     55s
zookeeper   0/1     55s
ubuntu@ece573:~/ece573-prj05$ kubectl get pods
NAME          READY   STATUS              RESTARTS   AGE
kafka-0       0/1     ContainerCreating   0          61s
zookeeper-0   0/1     ContainerCreating   0          61s
As the last few lines show, it will take some time for all Pods to become available. Run kubectl get again after a few minutes to make sure all pods are running.
ubuntu@ece573:~/ece573-prj05$ kubectl get pods
NAME          READY   STATUS    RESTARTS   AGE
kafka-0       1/1     Running   0          3m29s
kafka-1       1/1     Running   0          119s
kafka-2       1/1     Running   0          60s
zookeeper-0   1/1     Running   0          3m29s
If Pods are not started properly, you may troubleshoot issues with kubectl logs and watch out for any log messages makred with ERROR.
ubuntu@ece573:~/ece573-prj05$ kubectl logs kafka-0 --tail 10
...
[...] TRACE [Controller id=0] Checking need to trigger...
ubuntu@ece573:~/ece573-prj05$ kubectl logs zookeeper-0 --tail 10
...
[...] INFO Creating new log file: log.1...

Similar to Cassandra, we will then make sure that both ZooKeeper and Kafka are running properly using a few tools available from their Pods. For ZooKeeper, zookeeper-shell can be used to access the service, in particular data associated with paths like a filesystem of directories and files. For Kafka, we are interested in the path /brokers/ids where the broker ids are stored. Use ls to list them.

ubuntu@ece573:~/ece573-prj05$ kubectl exec zookeeper-0 -- zookeeper-shell localhost:2181 ls /brokers/ids
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]
Three brokers 0, 1, and 2 are available. You can view the detail of broker 0 via get
ubuntu@ece573:~/ece573-prj05$ kubectl exec zookeeper-0 -- zookeeper-shell localhost:2181 get /brokers/ids/0
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
 "endpoints":["PLAINTEXT://kafka-0.kafka-service.default.svc.cluster.local:9092"],
 "jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.default.svc.cluster.local",
 "version":5,"timestamp":"1698675619249"}
So this broker runs inside Pod kafka-0. This is as expected as we set the broker id according to the name of Pod it runs in.

For Kafka, kafka-topics is a very useful tool to manage topics, e.g. to list all topics as follows.

ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --list
Clearly, this command doesn't output anything at this point since there is no topic. Nevertheless, if Kafka is not running properly then you will see error messages.

Here is what you need to do for this section. You'll need to provide answers and screenshots as necessary in the project reports.


III. Topics and Messages

Now Kafka is running and applications can utilize Kafka producers and consumers for message communication. Similar to Project 4, we need to containerize applications and make them available to K8s. For simplicity, this time we combine the code for producer and consumer together in clients/clients.go and use an environment variable in clients.yml to control their behaviors at deployment time.

ubuntu@ece573:~/ece573-prj05$ ./build.sh
[+] Building 119.6s (10/10) FINISHED        docker:default
...
Image: "ece573-prj05-clients:v1" with ID "..." not yet present on node "kind-worker2", loading...
ubuntu@ece573:~/ece573-prj05$ kubectl apply -f clients.yml 
deployment.apps/ece573-prj05-producer created
deployment.apps/ece573-prj05-consumer created

Check the logs of producer and consumer after a while by kubectl logs.

ubuntu@ece573:~/ece573-prj05$ kubectl logs -l app=ece573-prj05-producer
2023/10/30 19:33:20 test: 3000 messages published
2023/10/30 19:33:31 test: 4000 messages published
2023/10/30 19:33:42 test: 5000 messages published
2023/10/30 19:33:52 test: 6000 messages published
2023/10/30 19:34:01 test: 7000 messages published
2023/10/30 19:34:11 test: 8000 messages published
2023/10/30 19:34:19 test: 9000 messages published
2023/10/30 19:34:27 test: 10000 messages published
2023/10/30 19:34:36 test: 11000 messages published
2023/10/30 19:34:46 test: 12000 messages published
ubuntu@ece573:~/ece573-prj05$ kubectl logs -l app=ece573-prj05-consumer
2023/10/30 19:32:33 test: start receiving messages from kafka-1.kafka-service.default.svc.cluster.local:9092
2023/10/30 19:33:31 test: received 1000 messages, last (0.409483)
2023/10/30 19:34:09 test: received 2000 messages, last (0.237691)
2023/10/30 19:34:44 test: received 3000 messages, last (0.245345)
It seems a lot of messages are missing - the consumer seems to receive only a quarter of the messages the producer has published. Why? Before answering this puzzle, use kubectl delete to remove our producer and consumer to preserve resources.
ubuntu@ece573:~/ece573-prj05$ kubectl delete -f clients.yml 
deployment.apps "ece573-prj05-producer" deleted
deployment.apps "ece573-prj05-consumer" deleted

Use kafka-topics again to see what topics are available and this time you should see the topic test.

ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --list
test
Indeed, Kafka will create a topic automatically once a message is published, with a default setting of partitions and replicas. The details of the topic test can be inspected as follows.
ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --describe test
Topic: test     TopicId: t1YMk67qQI-rt1Sk7QoXZw PartitionCount: 4       ReplicationFactor: 3   Configs: 
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,1,0
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0
        Topic: test     Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
        Topic: test     Partition: 3    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
So there are 4 partitions for the topic test, each replicated 3 times. When our producer publishes a message without specifying a partition, a partition is randomly assigned, and when our consumer receives messages, it only receives from partition 0 - so only a quarter was received.

Here is what you need to do for this section. You'll need to provide answers and screenshots as necessary in the project reports.


IV. A Better Consumer

As shown in clients/clients.go line 65, our consumer only consumes messages from partition 0 via a single PartitionConsumer. In order for our consumer to consume all messages from all of the 4 partitions, one idea is to create 4 PartitionConsumer's, each for a partition, and then use select to multiplex between the message channels. A better but more complicated implementation without the need to know the number of partitions will utilize ConsumerGroup from the library. You will need to study documentations to understand how to use it and an example can be found here.

For this section, you will need to modify clients/clients.go to consume all messages from all partitions. I will leave it to you to decide the approach you would like to take, either one of the two ideas discussed above or any of your own ideas, as long as you can demonstrate all messages are received. You'll need to provide a discussion on your implementation and screenshots as necessary in the project reports.


V. Project Deliverables

Complete the tasks for Section II (5 points), III (5 points), and IV (10 points), include them in a project report in .doc/.docs or .pdf format, and submit it to Blackboard before the deadline.