基于KRaft模式的kafka集群搭建
Kafka 2.8.0 版本移除了对Zookeeper的依赖,该版本将依赖于 ZooKeeper 的控制器改造成了基于 Kafka Raft 的 Quorm 控制器。
[TOC]
1.前期准备
- 机器规划清单
节点 | 端口 | 操作系统 | 安装目录 |
---|---|---|---|
10.30.3.231 | 9092 | Centos 7.9 x64 | /data/kafka_9092 |
10.30.3.232 | 9092 | Centos 7.9 x64 | /data/kafka_9092 |
10.30.3.233 | 9092 | Centos 7.9 x64 | /data/kafka_9092 |
- 1.1.所有节点:安装 jdk 1.8.0_391
curl -sL http://iso.sqlfans.cn/jdk/install_jdk_8u391.sh | bash
source /etc/profile
java -version
2.安装 kafka 集群
2.1.下载 kafka
- 所有节点:下载 kafka 3.5.0
curl -sL http://iso.sqlfans.cn/linux/kafka_2.12-3.5.0.tgz -o /opt/kafka_2.12-3.5.0.tgz
tar -xzf /opt/kafka_2.12-3.5.0.tgz -C /opt/
mv /opt/kafka_2.12-3.5.0 /data/kafka_9092
2.2.配置 server.properties
- 所有节点:部分参数需要统一
cp /data/kafka_9092/config/kraft/server.properties /data/kafka_9092/config/kraft/server.properties.bak.$(date +%Y%m%d%H%M%S)
sed -i 's/^process.roles=.*/process.roles=broker,controller/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^controller.quorum.voters=.*/controller.quorum.voters=1@10.30.3.231:9093,2@10.30.3.232:9093,3@10.30.3.233:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^log.dirs=.*/log.dirs=\/data\/kafka_9092\/logs/' /data/kafka_9092/config/kraft/server.properties
- 节点1:修改当前节点的
node.id
及listeners
及advertised.listeners
sed -i 's/^node.id=.*/node.id=1/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.231:9092,CONTROLLER:\/\/10.30.3.231:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.231:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"
- 节点2:修改当前节点的
node.id
及listeners
及advertised.listeners
sed -i 's/^node.id=.*/node.id=2/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.232:9092,CONTROLLER:\/\/10.30.3.232:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.232:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"
- 节点3:修改当前节点的
node.id
及listeners
及advertised.listeners
sed -i 's/^node.id=.*/node.id=3/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.233:9092,CONTROLLER:\/\/10.30.3.233:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.233:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"
2.3.启动进程
任一节点:生成集群uuid
[root@localhost ~]# KAFKA_CLUSTER_ID="$(/data/kafka_9092/bin/kafka-storage.sh random-uuid)" [root@localhost ~]# echo $KAFKA_CLUSTER_ID asjC792iR3CMelxQGh923A
所有节点:用同一个UUID格式日志目录,然后启动kafka进程
KAFKA_CLUSTER_ID=asjC792iR3CMelxQGh923A
/data/kafka_9092/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /data/kafka_9092/config/kraft/server.properties
nohup /data/kafka_9092/bin/kafka-server-start.sh /data/kafka_9092/config/kraft/server.properties > /data/kafka_9092/logs/kafka_startup.log 2>&1 &
sleep 5 && netstat -lnpt | egrep "(9092|9093)"
注:不可使用不同的uuid多次格式化,否则进程会启动失败
2.4.测试
# 创建topic:任一节点,指定broker及1个partition及3个副本
/data/kafka_9092/bin/kafka-topics.sh --create --bootstrap-server 10.30.3.231:9092 --replication-factor 3 --partitions 1 --topic test1
# 查询topic:任一节点都可确认
/data/kafka_9092/bin/kafka-topics.sh --list --bootstrap-server 10.30.3.231:9092
/data/kafka_9092/bin/kafka-topics.sh --describe --topic test1 --bootstrap-server 10.30.3.232:9092
# 生产消息:任一节点,写入2条测试消息
echo "message123" | /data/kafka_9092/bin/kafka-console-producer.sh --topic test1 --bootstrap-server 10.30.3.231:9092
echo "message456" | /data/kafka_9092/bin/kafka-console-producer.sh --topic test1 --bootstrap-server 10.30.3.232:9092
# 消费消息:任一节点,设定从头消费,执行 Ctrl-C 随时停止
/data/kafka_9092/bin/kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server 10.30.3.233:9092
# 删除topic:任一节点都可删除
/data/kafka_9092/bin/kafka-topics.sh --bootstrap-server 10.30.3.231:9092 --delete --topic test1
遇到的问题
场景1:如何彻底卸载kafka
cd /opt/
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9 2> /dev/null
rm -rf /data/kafka_9092
userdel -r kafka 2> /dev/null
sed -i '/kafka/d' /etc/rc.local