Apache Kafka is known for many things, one being the ability to process over 1 million messages per second. The ease of decoupling components makes it appealing to developers as well.
In this hands-on lab, you will build a custom Kafka cluster and create a topic. A topic consists of many partitions of messages and is the fundamental unit used in Kafka.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Download the Kafka Binaries
Use
wget
to download the tar file from the mirror:wget http://mirror.cogentco.com/pub/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
Extract the tar file and move it into the
/opt
directory:tar -xvf kafka_2.12-2.6.2.tgz
sudo mv kafka_2.12-2.6.2 /opt/kafka
Change to the
kafka
directory and list the contents:cd /opt/kafka
ls
- Install Java and Disable RAM Swap
Use the following command to install the latest Java Developer Kit (JDK):
sudo apt install -y default-jdk
Use the following command to verify that Java has been installed:
java -version
Use the following command to disable RAM swap:
swapoff -a
Use the following command to comment out swap in the
/etc/fstab
file:sudo sed -i '/ swap / s/^/#/' /etc/fstab
- Create a New Directory for Kafka and Zookeeper
Use the following command to create a new directory for Kafka message logs:
sudo mkdir -p /data/kafka
Use the following command to create a snapshot directory for Zookeeper:
sudo mkdir -p /data/zookeeper
Change ownership of those directories to allow the user
cloud_user
control:sudo chown -R cloud_user:cloud_user /data/kafka
sudo chown -R cloud_user:cloud_user /data/zookeeper
- Specify an ID for Each Zookeeper Server
Use the following command to create a file in
/data/zookeeper
(on server #1) calledmyid
with the contents "1" to specify Zookeeper server #1:echo "1" > /data/zookeeper/myid
Use the following command to create a file in
/data/zookeeper
(on server #2) calledmyid
with the contents "2" to specify Zookeeper server #2:echo "2" > /data/zookeeper/myid
Use the following command to create a file in
/data/zookeeper
(on server #3) calledmyid
with the contents "3" to specify Zookeeper server #3:echo "3" > /data/zookeeper/myid
- Modify the Kafka and Zookeeper Configuration Files
Use the following command to remove the existing
server.properties
file (in theconfig
directory) and create a newserver.properties
file:rm config/server.properties
vim config/server.properties
Copy and paste the following into the contents of the
server.properties
file and change thebroker.id
and theadvertised.listeners
:# change this for each broker broker.id=[broker_number] # change this to the hostname of each broker advertised.listeners=PLAINTEXT://[hostname]:9092 # The ability to delete topics delete.topic.enable=true # Where logs are stored log.dirs=/data/kafka # default number of partitions num.partitions=8 # default replica count based on the number of brokers default.replication.factor=3 # to protect yourself against broker failure min.insync.replicas=2 # logs will be deleted after how many hours log.retention.hours=168 # size of the log files log.segment.bytes=1073741824 # check to see if any data needs to be deleted log.retention.check.interval.ms=300000 # location of all zookeeper instances and kafka directory zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka # timeout for connecting with zookeeper zookeeper.connection.timeout.ms=6000 # automatically create topics auto.create.topics.enable=true
Use the following command to remove the existing
zookeeper.properties
file (in theconfig
directory) and create a newzookeeper.properties
file:rm config/zookeeper.properties
vim config/zookeeper.properties
Copy and paste the following into the contents of the
zookeeper.properties
file (don’t change this file):# the directory where the snapshot is stored. dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # setting number of connections to unlimited maxClientCnxns=0 # keeps a heartbeat of zookeeper in milliseconds tickTime=2000 # time for initial synchronization initLimit=10 # how many ticks can pass before timeout syncLimit=5 # define servers ip and internal ports to zookeeper server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888
- Create the Kafka and Zookeeper Service
Create the file
/etc/init.d/zookeeper
on each server and paste in the following contents:sudo vim /etc/init.d/zookeeper
#!/bin/bash #/etc/init.d/zookeeper DAEMON_PATH=/opt/kafka/bin DAEMON_NAME=zookeeper # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is already running"; else echo "Starting $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties fi ;; stop) echo "Shutting down $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is Running as PID: $pid" else echo "Zookeeper is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0
Change the file to executable, change ownership, install, and start the service:
sudo chmod +x /etc/init.d/zookeeper
sudo chown root:root /etc/init.d/zookeeper
sudo update-rc.d zookeeper defaults
sudo service zookeeper start
sudo service zookeeper status
Create the file
/etc/init.d/kafka
on each server and paste in the following contents:#!/bin/bash #/etc/init.d/kafka DAEMON_PATH=/opt/kafka/bin DAEMON_NAME=kafka # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH # See how we were called. case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is already running" else echo "Starting $DAEMON_NAME" $DAEMON_PATH/kafka-server-start.sh -daemon /opt/kafka/config/server.properties fi ;; stop) echo "Shutting down $DAEMON_NAME" $DAEMON_PATH/kafka-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is Running as PID: $pid" else echo "Kafka is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0
Save and close the file.
Change the file to executable, change ownership, install, and start the service:
sudo chmod +x /etc/init.d/kafka
sudo chown root:root /etc/init.d/kafka
sudo update-rc.d kafka defaults
sudo service kafka start
sudo service kafka status
- Create a Topic
Use the following command to create a topic named
test
:./bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test --replication-factor 1 --partitions 3
Use the following command to describe the topic:
./bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test --describe