基于KRaft模式的kafka集群搭建

Kafka 2.8.0 版本移除了对Zookeeper的依赖,该版本将依赖于 ZooKeeper 的控制器改造成了基于 Kafka Raft 的 Quorm 控制器。

[TOC]

1.前期准备

  • 机器规划清单
节点 端口 操作系统 安装目录
10.30.3.231 9092 Centos 7.9 x64 /data/kafka_9092
10.30.3.232 9092 Centos 7.9 x64 /data/kafka_9092
10.30.3.233 9092 Centos 7.9 x64 /data/kafka_9092
  • 1.1.所有节点:安装 jdk 1.8.0_391
curl -sL http://iso.sqlfans.cn/jdk/install_jdk_8u391.sh | bash
source /etc/bashrc
java -version

2.安装 kafka 集群

2.1.下载 kafka

  • 所有节点:下载 kafka 3.5.0
curl -sL http://iso.sqlfans.cn/linux/kafka_2.12-3.5.0.tgz -o /opt/kafka_2.12-3.5.0.tgz
tar -xzf /opt/kafka_2.12-3.5.0.tgz -C /opt/
mv /opt/kafka_2.12-3.5.0 /data/kafka_9092

2.2.配置 server.properties

  • 所有节点:部分参数需要统一
cp /data/kafka_9092/config/kraft/server.properties /data/kafka_9092/config/kraft/server.properties.bak.$(date +%Y%m%d%H%M%S)
sed -i 's/^process.roles=.*/process.roles=broker,controller/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^controller.quorum.voters=.*/controller.quorum.voters=1@10.30.3.231:9093,2@10.30.3.232:9093,3@10.30.3.233:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^log.dirs=.*/log.dirs=\/data\/kafka_9092\/logs/' /data/kafka_9092/config/kraft/server.properties
  • 节点1:修改当前节点的node.idlistenersadvertised.listeners
sed -i 's/^node.id=.*/node.id=1/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.231:9092,CONTROLLER:\/\/10.30.3.231:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.231:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"
  • 节点2:修改当前节点的node.idlistenersadvertised.listeners
sed -i 's/^node.id=.*/node.id=2/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.232:9092,CONTROLLER:\/\/10.30.3.232:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.232:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"
  • 节点3:修改当前节点的node.idlistenersadvertised.listeners
sed -i 's/^node.id=.*/node.id=3/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^listeners=.*/listeners=PLAINTEXT:\/\/10.30.3.233:9092,CONTROLLER:\/\/10.30.3.233:9093/' /data/kafka_9092/config/kraft/server.properties
sed -i 's/^advertised.listeners=.*/advertised.listeners=PLAINTEXT:\/\/10.30.3.233:9092/' /data/kafka_9092/config/kraft/server.properties
cat /data/kafka_9092/config/kraft/server.properties | grep -Ev '^$|#' | egrep "(process.|node.id|controller.|listeners|log.dir|sadvertised.)"

2.3.启动进程

  • 任一节点:生成集群uuid

    [root@localhost ~]# KAFKA_CLUSTER_ID="$(/data/kafka_9092/bin/kafka-storage.sh random-uuid)"
    [root@localhost ~]# echo $KAFKA_CLUSTER_ID
    asjC792iR3CMelxQGh923A
    
  • 所有节点:用同一个UUID格式日志目录,然后启动kafka进程

KAFKA_CLUSTER_ID=asjC792iR3CMelxQGh923A
/data/kafka_9092/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /data/kafka_9092/config/kraft/server.properties
nohup /data/kafka_9092/bin/kafka-server-start.sh /data/kafka_9092/config/kraft/server.properties > /data/kafka_9092/logs/kafka_startup.log 2>&1 &
sleep 5 && netstat -lnpt | egrep "(9092|9093)"

注:不可使用不同的uuid多次格式化,否则进程会启动失败

2.4.测试

# 创建topic:任一节点,指定broker及1个partition及3个副本
/data/kafka_9092/bin/kafka-topics.sh --create --bootstrap-server 10.30.3.231:9092 --replication-factor 3 --partitions 1 --topic test1

# 查询topic:任一节点都可确认
/data/kafka_9092/bin/kafka-topics.sh --list --bootstrap-server 10.30.3.231:9092
/data/kafka_9092/bin/kafka-topics.sh --describe --topic test1 --bootstrap-server 10.30.3.232:9092

# 生产消息:任一节点,写入2条测试消息
echo "message123" | /data/kafka_9092/bin/kafka-console-producer.sh --topic test1 --bootstrap-server 10.30.3.231:9092
echo "message456" | /data/kafka_9092/bin/kafka-console-producer.sh --topic test1 --bootstrap-server 10.30.3.232:9092

# 消费消息:任一节点,设定从头消费,执行 Ctrl-C 随时停止
/data/kafka_9092/bin/kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server 10.30.3.233:9092

# 删除topic:任一节点都可删除
/data/kafka_9092/bin/kafka-topics.sh --bootstrap-server 10.30.3.231:9092 --delete --topic test1

遇到的问题

场景1:如何彻底卸载kafka

cd /opt/
ps aux | grep kafka | grep -v grep | awk '{print $2}' | xargs kill -9 2> /dev/null
rm -rf /data/kafka_9092
userdel -r kafka 2> /dev/null
sed -i '/kafka/d' /etc/rc.local
Copyright © www.sqlfans.cn 2023 All Right Reserved更新时间: 2024-04-13 22:12:32

results matching ""

    No results matching ""