监控度量指标

Kafka使⽤Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使⽤Kafka Metrics,它是⼀个内置的度量标准注册表,可最⼤程度地减少拉⼊客户端应⽤程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使⽤可插拔的统计报告器报告统计信息,以连接到您的监视系统。

具体的监控指标可以查看官⽅⽂档: https://kafka.apache.org/10/documentation.html#monitoring。

JMX

Kafka开启Jmx端⼝

1
[root@node4 bin]# vim /opt/lagou/servers/kafka_2.12-1.0.2/bin/kafka-server-start.sh

所有kafka机器添加⼀个 JMX_PORT ,并重启kafka

验证JMX开启

⾸先打印9581端⼝占⽤的进程信息,然后使⽤进程编号对应到Kafka的进程号,搞定。

1
2
3
4
5
6
7
[root@Linux121 ~]# ss -nelp | grep 9581
tcp LISTEN 0 50 :::9581 :::* users:(("java",pid=27580,fd=79)) ino:3057996 sk:ffff9771791ba940 v6only:0 <->
[root@Linux121 ~]# jps
30260 Jps
6566 -- process information unavailable
27580 Kafka
20814 QuorumPeerMain

也可以查看Kafka启动⽇志,确定启动参数 -Dcom.sun.management.jmxremote.port=9581 存在即可

使⽤JConsole链接JMX端⼝

  • win/mac,找到jconsole⼯具并打开。

    win 在 ${JAVA_HOEM}/bin/ 目录下打开命令行,输入jconsole。

    Mac电脑可以直接命令⾏输⼊ jconsole。


    选择远程进程,输入ip:9581,点击连接。


    点击不安全连接。


    点击Mbean,选择kafka相关类,查看监控项信息。


    详细的监控指标,⻅官⽅⽂档:http://kafka.apache.org/10/documentation.html#monitoring

  • 常⽤的监控项

    • OS监控项

      objectName 指标项 说明
      java.lang:type=OperatingSystem FreePhysicalMemorySize 空闲物理内存
      java.lang:type=OperatingSystem SystemCpuLoad 系统CPU利⽤率
      java.lang:type=OperatingSystem ProcessCpuLoad 进程CPU利⽤率
      java.lang:type=GarbageCollector, name=G1 Young Generation CollectionCount GC次数
    • broker指标

      objectName 指标项 说明
      kafka.server:type=BrokerTopicMetrics, name=BytesInPerSec Count 每秒输⼊的流量
      kafka.server:type=BrokerTopicMetrics, name=BytesOutPerSec Count 每秒输出的流量
      kafka.server:type=BrokerTopicMetrics, name=BytesRejectedPerSec Count 每秒扔掉的流量
      kafka.server:type=BrokerTopicMetrics, name=MessagesInPerSec Count 每秒的消息写⼊总量
      kafka.server:type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec Count 当前机器每秒fetch请求失败的数量
      kafka.server:type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec Count 当前机器每秒produce请求失败的数量
      kafka.server:type=ReplicaManager, name=PartitionCount Value 该broker上的partition的数量
      kafka.server:type=ReplicaManager, name=LeaderCount Value Leader的replica的数量
      kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchConsumer Count ⼀个请求FetchConsumer耗费的所有时间
      kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchFollower Count ⼀个请求FetchFollower耗费的所有时间
      kafka.network:type=RequestMetrics, name=TotalTimeMs,request=Produce Count ⼀个请求Produce耗费的所有时间
    • producer以及topic指标

      objectName 指标项 官⽹说明 译⽂说明
      kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) incoming-byte-rate The average number of incoming bytes received per second from all servers. producer每秒的平均写⼊流量
      kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) outgoing-byte-rate The average number of outgoing bytes sent per second to all servers. producer每秒的输出流量
      kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) request-rate The average number of requests sent per second to the broker. producer每秒发给broker的平均request次数
      kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) response-rate The average number of responses received per second from the broker. producer每秒发给broker的平均response次数
      kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) request-latency-avg The average time taken for a fetch request. ⼀个fetch请求的平均时间
      kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-send-rate The average number of records sent per second for a topic. 每秒从topic发送的平均记录数
      kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-retry-total The total number of retried record sends 重试发送的消息总数量
      kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) record-error-total The total number of record sends that resulted in errors 发送错误的消息总数量
    • consumer指标

      objectName 指标项 官⽹说明 译⽂说明
      kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) records-lag-max Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. 由consumer提交的消息消费lag
      kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) records-consumed-rate The average number of records consumed per second 每秒平均消费的消息数量

编程获取监控指标


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.lagou.kafka.demo.monitor;

import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;

public class JMXMonitorDemo {

public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {

String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";

JMXServiceURL jmxURL = null;
JMXConnector jmxc = null;
MBeanServerConnection jmxs = null;
ObjectName mbeanObjName = null;
Iterator sampleIter = null;
Set sampleSet = null;

// 创建JMXServiceURL对象,参数是
jmxURL = new JMXServiceURL(jmxServiceURL);
// 建立到指定URL服务器的连接
jmxc = JMXConnectorFactory.connect(jmxURL);
// 返回代表远程MBean服务器的MBeanServerConnection对象
jmxs = jmxc.getMBeanServerConnection();
// 根据传入的字符串,创建ObjectName对象
// mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
// mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");
mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
// 获取指定ObjectName对应的MBeans
sampleSet = jmxs.queryMBeans(null, mbeanObjName);
// 迭代器
sampleIter = sampleSet.iterator();

if (sampleSet.isEmpty()) {
} else {
// 如果返回了,则打印信息
while (sampleIter.hasNext()) {
// Used to represent the object name of an MBean and its class name.
// If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides.
// 用于表示MBean的ObjectName和ClassName
ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
ObjectName objectName = sampleObj.getObjectName();
// 查看指定MBean指定属性的值
String count = jmxs.getAttribute(objectName, "Count").toString();
System.out.println(count);
}
}
// 关闭
jmxc.close();
}
}

监控⼯具 Kafka Eagle

我们可以使⽤Kafka-eagle管理Kafka集群。

  • 核⼼模块

    • ⾯板可视化

    • 主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等

    • 消费者应⽤:对不同消费者应⽤进⾏监控,包含Kafka API、Flink API、Spark API、Storm API、Flume API、LogStash API等

    • 集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端⼝号、Zookeeper Leader⻆⾊等。同时,还有多集群切换管理,Zookeeper Client操作⼊⼝

    • 集群监控:包含对Broker、Kafka核⼼指标、Zookeeper核⼼指标进⾏监控,并绘制历史趋势图

    • 告警功能:对消费者应⽤数据积压情况进⾏告警,以及对Kafka和Zookeeper监控度进⾏告警。同时,⽀持邮件、微信、钉钉告警通知

    • 系统管理:包含⽤户创建、⽤户⻆⾊分配、资源访问进⾏管理

  • 架构

    • 可视化:负责展示主题列表、集群健康、消费者应⽤等

    • 采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以后版本)

    • 数据存储:⽬前Kafka Eagle存储采⽤MySQL或SQLite,数据库和表的创建均是⾃动完成的,按照官⽅⽂档进⾏配置好,启动Kafka Eagle就会⾃动创建,⽤来存储元数据和监控数据

    • 监控:负责⻅消费者应⽤消费情况、集群健康状态

    • 告警:对监控到的异常进⾏告警通知,⽀持邮件、微信、钉钉等⽅式

    • 权限管理:对访问⽤户进⾏权限管理,对于管理员、开发者、访问者等不同⻆⾊的⽤户,分配不⽤的访问权限

注意:使用Kafka Eagle需要Kafka节点开启JMX。

安装与配置

  • 下载、安装Kafka Eagle

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 下载编译好的包
    wget http://pkgs-linux.cvimer.com/kafka-eagle.zip

    # 安装unzip
    yum install unzip

    # 配置kafka-eagle
    unzip -d /opt/lagou/servers/ /opt/lagou/software/kafka-eagle.zip

    cd /opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1
  • 配置环境变量

    1
    2
    3
    4
    5
    6
    7
    vim /etc/profile

    export KE_HOME=/opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1
    export PATH=$PATH:$KE_HOME/bin

    # 让配置⽣效
    source /etc/profile
  • 配置Kafka Eagle

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    cd /opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1/conf

    vim system-config.properties

    ######################################
    # multi zookeeper & kafka cluster list
    ######################################
    # 集群的别名,⽤于在kafka-eagle中进⾏区分。
    # 可以配置监控多个集群,别名⽤逗号隔开
    # kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
    kafka.eagle.zk.cluster.alias=cluster1
    # cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
    # 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的zookeeper.connect的值⼀致
    # 此处的前缀就是集群的别名
    cluster1.zk.list=node2:2181,node3:2181,node4:2181/myKafka
    #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123

    ######################################
    # broker size online list
    ######################################
    cluster1.kafka.eagle.broker.size=20

    ######################################
    # zookeeper客户端连接数限制
    ######################################
    kafka.zk.limit.size=25

    ######################################
    # kafka eagle⽹⻚端⼝号
    ######################################
    kafka.eagle.webui.port=8048

    ######################################
    # kafka 消费信息存储位置,⽤来兼容kafka低版本
    ######################################
    cluster1.kafka.eagle.offset.storage=kafka
    cluster2.kafka.eagle.offset.storage=zk

    ######################################
    # kafka metrics, 15 days by default
    ######################################
    kafka.eagle.metrics.charts=true
    kafka.eagle.metrics.retain=15

    ######################################
    # kafka sql topic records max
    ######################################
    kafka.eagle.sql.topic.records.max=5000
    kafka.eagle.sql.fix.error=true

    ######################################
    # 管理员删除kafka中topic的⼝令
    ######################################
    kafka.eagle.topic.token=keadmin

    ######################################
    # kafka 集群是否开启了认证模式,此处是cluster1集群的配置,禁⽤
    ######################################
    cluster1.kafka.eagle.sasl.enable=false
    cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
    cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.kafka.eagle.sasl.client.id=
    cluster1.kafka.eagle.sasl.cgroup.enable=false
    cluster1.kafka.eagle.sasl.cgroup.topics=

    ######################################
    # kafka ssl authenticate,示例配置
    ######################################
    cluster2.kafka.eagle.sasl.enable=false
    cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
    cluster2.kafka.eagle.sasl.mechanism=PLAIN
    cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
    cluster2.kafka.eagle.sasl.client.id=
    cluster2.kafka.eagle.sasl.cgroup.enable=false
    cluster2.kafka.eagle.sasl.cgroup.topics=

    ######################################
    # kafka ssl authenticate,示例配置
    ######################################
    cluster3.kafka.eagle.ssl.enable=false
    cluster3.kafka.eagle.ssl.protocol=SSL
    cluster3.kafka.eagle.ssl.truststore.location=
    cluster3.kafka.eagle.ssl.truststore.password=
    cluster3.kafka.eagle.ssl.keystore.location=
    cluster3.kafka.eagle.ssl.keystore.password=
    cluster3.kafka.eagle.ssl.key.password=
    cluster3.kafka.eagle.ssl.cgroup.enable=false
    cluster3.kafka.eagle.ssl.cgroup.topics=

    ######################################
    # 存储监控数据的数据库地址
    # kafka默认使⽤sqlite存储,需要指定和创建sqlite的⽬录
    # 如 /home/lagou/hadoop/kafka-eagle/db
    ######################################
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org

    ######################################
    # 还可以使⽤MySLQ存储监控数据
    ######################################
    #kafka.eagle.driver=com.mysql.jdbc.Driver
    #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?
    useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    #kafka.eagle.username=root
    #kafka.eagle.password=123456

    ######################################
    # kafka eagle 设置告警邮件服务器
    ######################################
    kafka.eagle.mail.enable=true
    kafka.eagle.mail.sa=kafka_lagou_alert
    kafka.eagle.mail.username=kafka_lagou_alert@163.com
    kafka.eagle.mail.password=Pas2W0rd
    kafka.eagle.mail.server.host=smtp.163.com
    kafka.eagle.mail.server.port=25
  • 启动kafka-eagle

    1
    ke.sh start  

    上图会提示我们登陆地址和账号密码。