Apache Kafka is known for many things, including the ability to process over one million messages per second. The ease of decoupling components makes it appealing to developers as well. In this hands-on lab, you will build a custom Kafka cluster and create a topic. A topic consists of many partitions of messages and is the fundamental unit used in Kafka.
Learning Objectives
Successfully complete this lab by achieving the following learning objectives:
- Download the Kafka Binaries
Use
wget
to download the tar file from the mirror:wget http://mirror.cogentco.com/pub/apache/kafka/2.8.2/kafka_2.12-2.8.2.tgz
Extract the tar file and move it into the
/opt
directory:tar -xvf kafka_2.12-2.8.2.tgz
sudo mv kafka_2.12-2.8.2 /opt/kafka
Change to the
kafka
directory and list the contents:cd /opt/kafka
ls
- Install Java and Disable RAM Swap
Use the following command to install the latest Java Developer Kit (JDK):
sudo apt install -y default-jdk
Use the following command to verify that Java has been installed:
java -version
Use the following command to disable RAM swap:
swapoff -a
Use the following command to comment out swap in the
/etc/fstab
file:sudo sed -i '/ swap / s/^/#/' /etc/fstab
- Create a New Directory for Kafka and Zookeeper
Use the following command to create a new directory for Kafka message logs:
sudo mkdir -p /data/kafka
Use the following command to create a snapshot directory for Zookeeper:
sudo mkdir -p /data/zookeeper
Change ownership of those directories to allow the user
cloud_user
control:sudo chown -R cloud_user:cloud_user /data/kafka
sudo chown -R cloud_user:cloud_user /data/zookeeper
- Specify an ID for Each Zookeeper Server
Use the following command to create a file in
/data/zookeeper
(on server #1) calledmyid
with the contents"1"
to specify Zookeeper server #1:echo "1" > /data/zookeeper/myid
Use the following command to create a file in
/data/zookeeper
(on server #2) calledmyid
with the contents"2"
to specify Zookeeper server #2:echo "2" > /data/zookeeper/myid
Use the following command to create a file in
/data/zookeeper
(on server #3) calledmyid
with the contents"3"
to specify Zookeeper server #3:echo "3" > /data/zookeeper/myid
- Modify the Kafka and Zookeeper Configuration Files
Use the following command to remove the existing
server.properties
file (in theconfig
directory) and create a newserver.properties
file:rm /opt/kafka/config/server.properties
vim /opt/kafka/config/server.properties
Copy and paste the following into the contents of the
server.properties
file and change thebroker.id
and theadvertised.listeners
:# change this for each broker broker.id=[broker_number] # change this to the hostname of each broker advertised.listeners=PLAINTEXT://[hostname]:9092 # The ability to delete topics delete.topic.enable=true # Where logs are stored log.dirs=/data/kafka # default number of partitions num.partitions=8 # default replica count based on the number of brokers default.replication.factor=3 # to protect yourself against broker failure min.insync.replicas=2 # logs will be deleted after how many hours log.retention.hours=168 # size of the log files log.segment.bytes=1073741824 # check to see if any data needs to be deleted log.retention.check.interval.ms=300000 # location of all zookeeper instances and kafka directory zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka # timeout for connecting with zookeeper zookeeper.connection.timeout.ms=6000 # automatically create topics auto.create.topics.enable=true
Use the following command to remove the existing
zookeeper.properties
file (in theconfig
directory) and create a newzookeeper.properties
file:rm /opt/kafka/config/zookeeper.properties
vim /opt/kafka/config/zookeeper.properties
Copy and paste the following into the contents of the
zookeeper.properties
file (don’t change this file):# the directory where the snapshot is stored. dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # setting number of connections to unlimited maxClientCnxns=0 # keeps a heartbeat of zookeeper in milliseconds tickTime=2000 # time for initial synchronization initLimit=10 # how many ticks can pass before timeout syncLimit=5 # define servers ip and internal ports to zookeeper server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888
- Create the Kafka and Zookeeper Service
Create the file
/etc/init.d/zookeeper
on each server:sudo vim /etc/init.d/zookeeper
Paste in the following contents:
#!/bin/bash #/etc/init.d/zookeeper DAEMON_PATH=/opt/kafka/bin DAEMON_NAME=zookeeper # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is already running"; else echo "Starting $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties fi ;; stop) echo "Shutting down $DAEMON_NAME"; $DAEMON_PATH/zookeeper-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'org.apache.zookeeper' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Zookeeper is Running as PID: $pid" else echo "Zookeeper is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0
Change the file to executable, change ownership, install, and start the service:
sudo chmod +x /etc/init.d/zookeeper
sudo chown root:root /etc/init.d/zookeeper
sudo update-rc.d zookeeper defaults
sudo service zookeeper start
sudo service zookeeper status
Create the file
/etc/init.d/kafka
on each server:sudo vim /etc/init.d/kafka
Paste in the following contents:
#!/bin/bash #/etc/init.d/kafka DAEMON_PATH=/opt/kafka/bin DAEMON_NAME=kafka # Check that networking is up. #[ ${NETWORKING} = "no" ] && exit 0 PATH=$PATH:$DAEMON_PATH # See how we were called. case "$1" in start) # Start daemon. pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is already running" else echo "Starting $DAEMON_NAME" $DAEMON_PATH/kafka-server-start.sh -daemon /opt/kafka/config/server.properties fi ;; stop) echo "Shutting down $DAEMON_NAME" $DAEMON_PATH/kafka-server-stop.sh ;; restart) $0 stop sleep 2 $0 start ;; status) pid=`ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}'` if [ -n "$pid" ] then echo "Kafka is Running as PID: $pid" else echo "Kafka is not Running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" exit 1 esac exit 0
Save and close the file.
Change the file to executable, change ownership, install, and start the service:
sudo chmod +x /etc/init.d/kafka
sudo chown root:root /etc/init.d/kafka
sudo update-rc.d kafka defaults
sudo service kafka start
sudo service kafka status
- Create a Topic
Use the following command to create a topic named
test
:/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic test --replication-factor 1 --partitions 3
Use the following command to describe the topic:
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --topic test --describe