Kafka集群监控
监控度量指标
Kafka使⽤Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使⽤Kafka Metrics,它是⼀个内置的度量标准注册表,可最⼤程度地减少拉⼊客户端应⽤程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使⽤可插拔的统计报告器报告统计信息,以连接到您的监视系统。
具体的监控指标可以查看官⽅⽂档: https://kafka.apache.org/10/documentation.html#monitoring。
JMX
Kafka开启Jmx端⼝
1 | [root@node4 bin]# vim /opt/lagou/servers/kafka_2.12-1.0.2/bin/kafka-server-start.sh |
所有kafka机器添加⼀个 JMX_PORT ,并重启kafka
验证JMX开启
⾸先打印9581端⼝占⽤的进程信息,然后使⽤进程编号对应到Kafka的进程号,搞定。
1 | [root@Linux121 ~]# ss -nelp | grep 9581 |
也可以查看Kafka启动⽇志,确定启动参数 -Dcom.sun.management.jmxremote.port=9581 存在即可
使⽤JConsole链接JMX端⼝
-
win/mac,找到jconsole⼯具并打开。
win 在 ${JAVA_HOEM}/bin/ 目录下打开命令行,输入jconsole。
Mac电脑可以直接命令⾏输⼊ jconsole。
选择远程进程,输入ip:9581,点击连接。
点击不安全连接。
点击Mbean,选择kafka相关类,查看监控项信息。
详细的监控指标,⻅官⽅⽂档:http://kafka.apache.org/10/documentation.html#monitoring
-
常⽤的监控项
-
OS监控项
objectName 指标项 说明 java.lang:type=OperatingSystem FreePhysicalMemorySize 空闲物理内存 java.lang:type=OperatingSystem SystemCpuLoad 系统CPU利⽤率 java.lang:type=OperatingSystem ProcessCpuLoad 进程CPU利⽤率 java.lang:type=GarbageCollector, name=G1 Young Generation CollectionCount GC次数 -
broker指标
objectName 指标项 说明 kafka.server:type=BrokerTopicMetrics, name=BytesInPerSec Count 每秒输⼊的流量 kafka.server:type=BrokerTopicMetrics, name=BytesOutPerSec Count 每秒输出的流量 kafka.server:type=BrokerTopicMetrics, name=BytesRejectedPerSec Count 每秒扔掉的流量 kafka.server:type=BrokerTopicMetrics, name=MessagesInPerSec Count 每秒的消息写⼊总量 kafka.server:type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec Count 当前机器每秒fetch请求失败的数量 kafka.server:type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec Count 当前机器每秒produce请求失败的数量 kafka.server:type=ReplicaManager, name=PartitionCount Value 该broker上的partition的数量 kafka.server:type=ReplicaManager, name=LeaderCount Value Leader的replica的数量 kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchConsumer Count ⼀个请求FetchConsumer耗费的所有时间 kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchFollower Count ⼀个请求FetchFollower耗费的所有时间 kafka.network:type=RequestMetrics, name=TotalTimeMs,request=Produce Count ⼀个请求Produce耗费的所有时间 -
producer以及topic指标
objectName 指标项 官⽹说明 译⽂说明 kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) incoming-byte-rate The average number of incoming bytes received per second from all servers. producer每秒的平均写⼊流量 kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) outgoing-byte-rate The average number of outgoing bytes sent per second to all servers. producer每秒的输出流量 kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) request-rate The average number of requests sent per second to the broker. producer每秒发给broker的平均request次数 kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) response-rate The average number of responses received per second from the broker. producer每秒发给broker的平均response次数 kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) request-latency-avg The average time taken for a fetch request. ⼀个fetch请求的平均时间 kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-send-rate The average number of records sent per second for a topic. 每秒从topic发送的平均记录数 kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-retry-total The total number of retried record sends 重试发送的消息总数量 kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-error-total The total number of record sends that resulted in errors 发送错误的消息总数量 -
consumer指标
objectName 指标项 官⽹说明 译⽂说明 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) records-lag-max Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. 由consumer提交的消息消费lag kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) records-consumed-rate The average number of records consumed per second 每秒平均消费的消息数量
-
编程获取监控指标
1 | package com.lagou.kafka.demo.monitor; |
监控⼯具 Kafka Eagle
我们可以使⽤Kafka-eagle管理Kafka集群。
-
核⼼模块
-
⾯板可视化
-
主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等
-
消费者应⽤:对不同消费者应⽤进⾏监控,包含Kafka API、Flink API、Spark API、Storm API、Flume API、LogStash API等
-
集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端⼝号、Zookeeper Leader⻆⾊等。同时,还有多集群切换管理,Zookeeper Client操作⼊⼝
-
集群监控:包含对Broker、Kafka核⼼指标、Zookeeper核⼼指标进⾏监控,并绘制历史趋势图
-
告警功能:对消费者应⽤数据积压情况进⾏告警,以及对Kafka和Zookeeper监控度进⾏告警。同时,⽀持邮件、微信、钉钉告警通知
-
系统管理:包含⽤户创建、⽤户⻆⾊分配、资源访问进⾏管理
-
-
架构
-
可视化:负责展示主题列表、集群健康、消费者应⽤等
-
采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以后版本)
-
数据存储:⽬前Kafka Eagle存储采⽤MySQL或SQLite,数据库和表的创建均是⾃动完成的,按照官⽅⽂档进⾏配置好,启动Kafka Eagle就会⾃动创建,⽤来存储元数据和监控数据
-
监控:负责⻅消费者应⽤消费情况、集群健康状态
-
告警:对监控到的异常进⾏告警通知,⽀持邮件、微信、钉钉等⽅式
-
权限管理:对访问⽤户进⾏权限管理,对于管理员、开发者、访问者等不同⻆⾊的⽤户,分配不⽤的访问权限
-
注意:使用Kafka Eagle需要Kafka节点开启JMX。
安装与配置
-
下载、安装Kafka Eagle
1
2
3
4
5
6
7
8
9
10# 下载编译好的包
wget http://pkgs-linux.cvimer.com/kafka-eagle.zip
# 安装unzip
yum install unzip
# 配置kafka-eagle
unzip -d /opt/lagou/servers/ /opt/lagou/software/kafka-eagle.zip
cd /opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1 -
配置环境变量
1
2
3
4
5
6
7vim /etc/profile
export KE_HOME=/opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1
export PATH=$PATH:$KE_HOME/bin
# 让配置⽣效
source /etc/profile -
配置Kafka Eagle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126cd /opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1/conf
vim system-config.properties
######################################
# multi zookeeper & kafka cluster list
######################################
# 集群的别名,⽤于在kafka-eagle中进⾏区分。
# 可以配置监控多个集群,别名⽤逗号隔开
# kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
kafka.eagle.zk.cluster.alias=cluster1
# cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
# 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的zookeeper.connect的值⼀致
# 此处的前缀就是集群的别名
cluster1.zk.list=node2:2181,node3:2181,node4:2181/myKafka
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20
######################################
# zookeeper客户端连接数限制
######################################
kafka.zk.limit.size=25
######################################
# kafka eagle⽹⻚端⼝号
######################################
kafka.eagle.webui.port=8048
######################################
# kafka 消费信息存储位置,⽤来兼容kafka低版本
######################################
cluster1.kafka.eagle.offset.storage=kafka
cluster2.kafka.eagle.offset.storage=zk
######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=true
######################################
# 管理员删除kafka中topic的⼝令
######################################
kafka.eagle.topic.token=keadmin
######################################
# kafka 集群是否开启了认证模式,此处是cluster1集群的配置,禁⽤
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=
######################################
# kafka ssl authenticate,示例配置
######################################
cluster2.kafka.eagle.sasl.enable=false
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.kafka.eagle.sasl.client.id=
cluster2.kafka.eagle.sasl.cgroup.enable=false
cluster2.kafka.eagle.sasl.cgroup.topics=
######################################
# kafka ssl authenticate,示例配置
######################################
cluster3.kafka.eagle.ssl.enable=false
cluster3.kafka.eagle.ssl.protocol=SSL
cluster3.kafka.eagle.ssl.truststore.location=
cluster3.kafka.eagle.ssl.truststore.password=
cluster3.kafka.eagle.ssl.keystore.location=
cluster3.kafka.eagle.ssl.keystore.password=
cluster3.kafka.eagle.ssl.key.password=
cluster3.kafka.eagle.ssl.cgroup.enable=false
cluster3.kafka.eagle.ssl.cgroup.topics=
######################################
# 存储监控数据的数据库地址
# kafka默认使⽤sqlite存储,需要指定和创建sqlite的⽬录
# 如 /home/lagou/hadoop/kafka-eagle/db
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
######################################
# 还可以使⽤MySLQ存储监控数据
######################################
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?
useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=123456
######################################
# kafka eagle 设置告警邮件服务器
######################################
kafka.eagle.mail.enable=true
kafka.eagle.mail.sa=kafka_lagou_alert
kafka.eagle.mail.username=kafka_lagou_alert@163.com
kafka.eagle.mail.password=Pas2W0rd
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25 -
启动kafka-eagle
1
ke.sh start
上图会提示我们登陆地址和账号密码。