In this project, you will learn the basics of Apache Kafka, a popular open-source distributed event streaming platform. We'll continue to use Kubernetes (K8s) and kind to manage our Kafka cluster so you will also be able to learn more about K8s.
(Skip for offline version) Please create a new VM by importing our VM Appliance, make a clone of the repository (or fork it first) https://github.com/wngjia/ece573-prj05.git, and execute setup_vm.sh to setup the VM as needed. Since the setup_vm.sh is the same as that of Project 4, you may choose to reuse the VM for Project 4 this time.
Same as Project 4, the cluster.yml script defines a cluster of 5 nodes for kind. Use reset_cluster.sh to pull the necessary images, delete any existing kind cluster, create a new one with the cluster.yml script, and load images into nodes. It may take sometime to complete so please be patient.
ubuntu@ece573:~/ece573-prj05$ ./reset_cluster.sh ... Deleting cluster "kind" ... Deleted nodes: ["kind-worker" "kind-control-plane" "kind-worker4" "kind-worker3" "kind-worker2"] Creating cluster "kind" ... ✓ Ensuring node image (kindest/node:v1.30.0) 🖼 ... Image: "confluentinc/cp-zookeeper:7.3.5" with ID "sha256:2651d76ba7aef97794f02e75bc5ba14b38d2b6dc373e49a365dab3a01ed3f652" not yet present on node "kind-worker3", loading... ...It will take sometime to complete and then you can verify that the cluster is running by kind get nodes
ubuntu@ece573:~/ece573-prj05$ kind get nodes kind-worker3 kind-worker2 kind-worker4 kind-worker kind-control-plane
Next, we will deploy Kafka to the K8s cluster. Unlike Cassandra where the states of multiple nodes are managed by the Cassandra programs themselves, Kafka manages its node states via Apache ZooKeeper that provides distributed coordination. Therefore, the K8s configuration file kafka.yml includes two pairs of Services and StatefulSets, one as zookeeper-service and the other as kafka-service. For simplicity, zookeeper-service only uses 1 Pod of ZooKeeper replica while kafka-service uses 3 Pods of Kafka replicas. Start the services by kubectl apply
ubuntu@ece573:~/ece573-prj05$ kubectl apply -f kafka.yml service/zookeeper-service created service/kafka-service created statefulset.apps/zookeeper created statefulset.apps/kafka createdVerify the status by kubectl get
ubuntu@ece573:~/ece573-prj05$ kubectl get services NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-service ClusterIP NoneAs the last few lines show, it will take some time for all Pods to become available. Run kubectl get again after a few minutes to make sure all pods are running.9092/TCP 46s kubernetes ClusterIP 10.96.0.1 443/TCP 9m17s zookeeper-service ClusterIP None 2181/TCP 46s ubuntu@ece573:~/ece573-prj05$ kubectl get statefulsets NAME READY AGE kafka 0/3 55s zookeeper 0/1 55s ubuntu@ece573:~/ece573-prj05$ kubectl get pods NAME READY STATUS RESTARTS AGE kafka-0 0/1 ContainerCreating 0 61s zookeeper-0 0/1 ContainerCreating 0 61s
ubuntu@ece573:~/ece573-prj05$ kubectl get pods NAME READY STATUS RESTARTS AGE kafka-0 1/1 Running 0 3m29s kafka-1 1/1 Running 0 119s kafka-2 1/1 Running 0 60s zookeeper-0 1/1 Running 0 3m29sIf Pods are not started properly, you may troubleshoot issues with kubectl logs and watch out for any log messages makred with ERROR.
ubuntu@ece573:~/ece573-prj05$ kubectl logs kafka-0 --tail 10 ... [...] TRACE [Controller id=0] Checking need to trigger... ubuntu@ece573:~/ece573-prj05$ kubectl logs zookeeper-0 --tail 10 ... [...] INFO Creating new log file: log.1...
Similar to Cassandra, we will then make sure that both ZooKeeper and Kafka are running properly using a few tools available from their Pods. For ZooKeeper, zookeeper-shell can be used to access the service, in particular data associated with paths like a filesystem of directories and files. For Kafka, we are interested in the path /brokers/ids where the broker ids are stored. Use ls to list them.
ubuntu@ece573:~/ece573-prj05$ kubectl exec zookeeper-0 -- zookeeper-shell localhost:2181 ls /brokers/ids Connecting to localhost:2181 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [0, 1, 2]Three brokers 0, 1, and 2 are available. You can view the detail of broker 0 via get
ubuntu@ece573:~/ece573-prj05$ kubectl exec zookeeper-0 -- zookeeper-shell localhost:2181 get /brokers/ids/0 Connecting to localhost:2181 WATCHER:: WatchedEvent state:SyncConnected type:None path:null {"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}, "endpoints":["PLAINTEXT://kafka-0.kafka-service.default.svc.cluster.local:9092"], "jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.default.svc.cluster.local", "version":5,"timestamp":"1698675619249"}So this broker runs inside Pod kafka-0. This is as expected as we set the broker id according to the name of Pod it runs in.
For Kafka, kafka-topics is a very useful tool to manage topics, e.g. to list all topics as follows.
ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --listClearly, this command doesn't output anything at this point since there is no topic. Nevertheless, if Kafka is not running properly then you will see error messages.
Here is what you need to do for this section. You'll need to provide answers and screenshots as necessary in the project reports.
Now Kafka is running and applications can utilize Kafka producers and consumers for message communication. Similar to Project 4, we need to containerize applications and make them available to K8s. For simplicity, this time we combine the code for producer and consumer together in clients/clients.go and use an environment variable in clients.yml to control their behaviors at deployment time.
ubuntu@ece573:~/ece573-prj05$ ./build.sh [+] Building 119.6s (10/10) FINISHED docker:default ... Image: "ece573-prj05-clients:v1" with ID "..." not yet present on node "kind-worker2", loading... ubuntu@ece573:~/ece573-prj05$ kubectl apply -f clients.yml deployment.apps/ece573-prj05-producer created deployment.apps/ece573-prj05-consumer created
Check the logs of producer and consumer after a while by kubectl logs.
ubuntu@ece573:~/ece573-prj05$ kubectl logs -l app=ece573-prj05-producer 2023/10/30 19:33:20 test: 3000 messages published 2023/10/30 19:33:31 test: 4000 messages published 2023/10/30 19:33:42 test: 5000 messages published 2023/10/30 19:33:52 test: 6000 messages published 2023/10/30 19:34:01 test: 7000 messages published 2023/10/30 19:34:11 test: 8000 messages published 2023/10/30 19:34:19 test: 9000 messages published 2023/10/30 19:34:27 test: 10000 messages published 2023/10/30 19:34:36 test: 11000 messages published 2023/10/30 19:34:46 test: 12000 messages published ubuntu@ece573:~/ece573-prj05$ kubectl logs -l app=ece573-prj05-consumer 2023/10/30 19:32:33 test: start receiving messages from kafka-1.kafka-service.default.svc.cluster.local:9092 2023/10/30 19:33:31 test: received 1000 messages, last (0.409483) 2023/10/30 19:34:09 test: received 2000 messages, last (0.237691) 2023/10/30 19:34:44 test: received 3000 messages, last (0.245345)It seems a lot of messages are missing - the consumer seems to receive only a quarter of the messages the producer has published. Why? Before answering this puzzle, use kubectl delete to remove our producer and consumer to preserve resources.
ubuntu@ece573:~/ece573-prj05$ kubectl delete -f clients.yml deployment.apps "ece573-prj05-producer" deleted deployment.apps "ece573-prj05-consumer" deleted
Use kafka-topics again to see what topics are available and this time you should see the topic test.
ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --list testIndeed, Kafka will create a topic automatically once a message is published, with a default setting of partitions and replicas. The details of the topic test can be inspected as follows.
ubuntu@ece573:~/ece573-prj05$ kubectl exec kafka-0 -- kafka-topics --bootstrap-server localhost:9092 --describe test Topic: test TopicId: t1YMk67qQI-rt1Sk7QoXZw PartitionCount: 4 ReplicationFactor: 3 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0 Topic: test Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0 Topic: test Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 2,1,0 Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0So there are 4 partitions for the topic test, each replicated 3 times. When our producer publishes a message without specifying a partition, a partition is randomly assigned, and when our consumer receives messages, it only receives from partition 0 - so only a quarter was received.
Here is what you need to do for this section. You'll need to provide answers and screenshots as necessary in the project reports.
As shown in clients/clients.go line 65, our consumer only consumes messages from partition 0 via a single PartitionConsumer. In order for our consumer to consume all messages from all of the 4 partitions, one idea is to create 4 PartitionConsumer's, each for a partition, and then use select to multiplex between the message channels. A better but more complicated implementation without the need to know the number of partitions will utilize ConsumerGroup from the library. You will need to study documentations to understand how to use it and an example can be found here.
For this section, you will need to modify clients/clients.go to consume all messages from all partitions. I will leave it to you to decide the approach you would like to take, either one of the two ideas discussed above or any of your own ideas, as long as you can demonstrate all messages are received. You'll need to provide a discussion on your implementation and screenshots as necessary in the project reports.
Complete the tasks for Section II (5 points), III (5 points), and IV (10 points), include them in a project report in .doc/.docs or .pdf format, and submit it to Blackboard before the deadline.