Kafka集群应用与搭建
集群应⽤场景
-
- 消息传递
Kafka可以很好地替代传统邮件代理。消息代理的使⽤有多种原因(将处理与数据⽣产者分离,缓冲未处理的消息等)。与⼤多数邮件系统相⽐,Kafka具有更好的吞吐量,内置的分区,复制和容错功能,这使其成为⼤规模邮件处理应⽤程序的理想解决⽅案。
根据我们的经验,消息传递的使⽤通常吞吐量较低,但是可能需要较低的端到端延迟,并且通常取决于Kafka提供的强⼤的持久性保证。
在这个领域,Kafka与ActiveMQ或 RabbitMQ等传统消息传递系统相当。
-
- ⽹站活动路由
Kafka最初的⽤例是能够将⽤户活动跟踪管道重建为⼀组实时的发布-订阅。这意味着将⽹站活动(⻚⾯浏览,搜索或⽤户可能采取的其他操作)发布到中⼼主题,每种活动类型只有⼀个主题。这些提要可⽤于⼀系列⽤例的订阅,包括实时处理,实时监控,以及加载到Hadoop或脱机数据仓库系统中以进⾏脱机处理和报告。
活动跟踪通常量很⼤,因为每个⽤户⻚⾯视图都会⽣成许多活动消息。
-
- 监控指标
Kafka通常⽤于操作监控数据。这涉及汇总来⾃分布式应⽤程序的统计信息,以⽣成操作数据的集中。
-
- ⽇志汇总
许多⼈使⽤Kafka代替⽇志聚合解决⽅案。⽇志聚合通常从服务器收集物理⽇志⽂件,并将它们放在中央位置(也许是⽂件服务器或HDFS)以进⾏处理。Kafka提取⽂件的详细信息,并以⽇志流的形式更清晰地抽象⽇志或事件数据。这允许较低延迟的处理,并更容易⽀持多个数据源和分布式数据消耗。与以⽇志为中⼼的系统(例如Scribe或Flume)相⽐,Kafka具有同样出⾊的性能,由于复制⽽提供的更强的耐⽤性保证以及更低的端到端延迟。
-
- 流处理
Kafka的许多⽤户在由多个阶段组成的处理管道中处理数据,其中原始输⼊数据从Kafka主题中使⽤,然后进⾏汇总,充实或以其他⽅式转换为新主题,以供进⼀步使⽤或后续处理。例如,⽤于推荐新闻⽂章的处理管道可能会从RSS提要中检索⽂章内容,并将其发布到“⽂章”主题中。进⼀步的处理可能会使该内容规范化或重复数据删除,并将清洗后的⽂章内容发布到新主题中;最后的处理阶段可能会尝试向⽤户推荐此内容。这样的处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,⼀个轻量但功能强⼤的流处理库称为Kafka Streams 可以在Apache Kafka中使⽤来执⾏上述数据处理。除了Kafka Streams以外,其他开源流处理⼯具还包括Apache Storm和 Apache Samza。
-
- 活动采集
事件源是⼀种应⽤程序,其中状态更改以时间顺序记录记录。Kafka对⼤量存储的⽇志数据的⽀持使其成为以这种样式构建的应⽤程序的绝佳后端。
-
- 提交⽇志
Kafka可以⽤作分布式系统的⼀种外部提交⽇志。该⽇志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的⽇志压缩功能有助于⽀持此⽤法。在这种⽤法中,Kafka类似于Apache BookKeeper项⽬。
-
横向扩展,提⾼Kafka的处理能⼒
-
镜像,副本,提供⾼可⽤。
集群搭建
-
- 搭建设计
-
- 分配三台Linux,⽤于安装拥有三个节点的Kafka集群。
node2(192.168.91.121)
node3(192.168.91.122)
node4(192.168.91.123)以上三台主机的/etc/hosts配置:
1
2
3192.168.91.121 node2
192.168.91.122 node3
192.168.91.123 node4
Zookeeper集群搭建
-
- Linux安装JDK,三台Linux都安装。
上传JDK到linux, 安装并配置JDK
1
2
3
4
5
6
7
8
9
10# 使⽤rpm安装JDK
rpm -ivh jdk-8u261-linux-x64.rpm
# 默认的安装路径是/usr/java/jdk1.8.0_261-amd64
# 配置JAVA_HOME
vim /etc/profile
# ⽂件最后添加两⾏
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin
# 退出vim,使配置⽣效
source /etc/profile查看JDK是否正确安装
1
java -version
-
- Linux 安装Zookeeper,三台Linux都安装,以搭建Zookeeper集群
上传zookeeper-3.4.14.tar.gz到Linux,解压并配置zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34# node2操作
# 解压到/opt⽬录
tar -zxf zookeeper-3.4.14.tar.gz -C /opt
# 配置
cd /opt/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 设置
dataDir=/var/lagou/zookeeper/data
# 添加
server.1=node2:2881:3881
server.2=node3:2881:3881
server.3=node4:2881:3881
# 退出vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid
# 配置环境变量
vim /etc/profile
# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,让配置⽣效
source /etc/profile
# 将/opt/zookeeper-3.4.14拷⻉到node3,node4
scp -r /opt/zookeeper-3.4.14/ node3:/opt
scp -r /opt/zookeeper-3.4.14/ node4:/opt1
2
3
4
5
6
7
8
9
10
11
12
13
14# node3操作
# 配置环境变量
vim /etc/profile
# 在配置JDK环境变量基础上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,让配置⽣效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 2 > /var/lagou/zookeeper/data/myid1
2
3
4
5
6
7
8
9
10
11
12
13
14# node4操作
# 配置环境变量
vim /etc/profile
# 在配置JDK环境变量基础上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log
# 退出vim,让配置⽣效
source /etc/profile
mkdir -p /var/lagou/zookeeper/data
echo 3 > /var/lagou/zookeeper/data/myid -
- 启动zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# 在三台Linux上启动Zookeeper
[root@node2 ~]# zkServer.sh start
[root@node3 ~]# zkServer.sh start
[root@node4 ~]# zkServer.sh start
# 在三台Linux上查看Zookeeper的状态
[root@node2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
[root@node3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader
[root@node4 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
Kafka集群搭建
-
上传并解压Kafka到/opt/lagou/servers
1
2
3
4
5
6# 解压到/opt
tar -zxf /opt/lagou/software/kafka_2.12-1.0.2.tgz -C /opt/lagou/servers
# 拷⻉到node3和node4
scp -r /opt/lagou/servers/kafka_2.12-1.0.2/ 192.168.91.122:/opt/lagou/servers
scp -r /opt/lagou/servers/kafka_2.12-1.0.2/ 192.168.91.123:/opt/lagou/servers -
配置Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34# 配置环境变量,三台Linux都要配置
vim /etc/profile
# 添加以下内容:
export KAFKA_HOME=/opt/lagou/servers/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin
# 让配置⽣效
source /etc/profile
# node2配置
vim /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.91.121:9092
log.dirs=/opt/lagou/servers/kafka_2.12-1.0.2/kafka-logs
zookeeper.connect=192.168.91.121:2181,192.168.91.122:2181,192.168.91.123:2181/myKafka
# 其他使⽤默认配置
# node3配置
vim /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.91.122:9092
log.dirs=/opt/lagou/servers/kafka_2.12-1.0.2/kafka-logs
zookeeper.connect=192.168.91.121:2181,192.168.91.122:2181,192.168.91.123:2181/myKafka
# 其他使⽤默认配置
# node4配置
vim /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.91.123:9092
log.dirs=/opt/lagou/servers/kafka_2.12-1.0.2/kafka-logs
zookeeper.connect=192.168.91.121:2181,192.168.91.122:2181,192.168.91.123:2181/myKafka
# 其他使⽤默认配置 -
启动Kafka
1
2
3[root@node2 ~]# kafka-server-start.sh -daemon /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties
[root@node3 ~]# kafka-server-start.sh -daemon /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties
[root@node4 ~]# kafka-server-start.sh -daemon /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties -
验证Kafka
node2节点的Cluster Id:
node3节点的Cluster Id:
node4节点的Cluster Id:
-
Cluster Id是⼀个唯⼀的不可变的标志符,⽤于唯⼀标志⼀个Kafka集群。
-
该Id最多可以有22个字符组成,字符对应于URL-safe Base64。
-
Kafka 0.10.1版本及之后的版本中,在集群第⼀次启动的时候,Broker从Zookeeper的 <Kafka_ROOT>/cluster/id节点获取。如果该Id不存在,就⾃动⽣成⼀个新的。
1
2
3
4
5zkCli.sh
# 查看每个Broker的信息
get /myKafka/brokers/ids/0
get /myKafka/brokers/ids/1
get /myKafka/brokers/ids/2node2节点在Zookeeper上的信息:
1
2
3
4
5
6
7
8
9
10
11
12
13[zk: localhost:2181(CONNECTED) 4] get /myKafka/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.91.121:9092"],"jmx_port":-1,"host":"192.168.91.121","timestamp":"1659091200230","port":9092,"version":4}
cZxid = 0x500000034
ctime = Fri Jul 29 11:40:00 WEST 2022
mZxid = 0x500000034
mtime = Fri Jul 29 11:40:00 WEST 2022
pZxid = 0x500000034
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x200000c64610000
dataLength = 198
numChildren = 0node3节点在Zookeeper上的信息:
1
2
3
4
5
6
7
8
9
10
11
12
13[zk: localhost:2181(CONNECTED) 5] get /myKafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.91.122:9092"],"jmx_port":-1,"host":"192.168.91.122","timestamp":"1659091212257","port":9092,"version":4}
cZxid = 0x50000003b
ctime = Fri Jul 29 11:40:12 WEST 2022
mZxid = 0x50000003b
mtime = Fri Jul 29 11:40:12 WEST 2022
pZxid = 0x50000003b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x300000c5b110000
dataLength = 198
numChildren = 0node4节点在Zookeeper上的信息:
1
2
3
4
5
6
7
8
9
10
11
12
13[zk: localhost:2181(CONNECTED) 6] get /myKafka/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.91.123:9092"],"jmx_port":-1,"host":"192.168.91.123","timestamp":"1659091215363","port":9092,"version":4}
cZxid = 0x500000042
ctime = Fri Jul 29 11:40:15 WEST 2022
mZxid = 0x500000042
mtime = Fri Jul 29 11:40:15 WEST 2022
pZxid = 0x500000042
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100000c908d0003
dataLength = 198
numChildren = 0 -