会员活跃度分析之日志数据采集
需求分析
会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。
电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性,需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。
-
计算指标:
新增会员:每日新增会员数
活跃会员:每日,每周,每月的活跃会员数
会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率
-
指标口径业务逻辑:
会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID 来标识一个独立会员,每部移动设备是一个会员;
活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备每天多次打开计算为一个活跃会员。在自然周内启动过应用的会员为周活跃会员,同理还有月活跃会员;
会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自然周)、月活跃率(自然月);
新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然月)新增会员;
留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。
已知条件:
1、明确了需求
2、输入:启动日志(OK)、事件日志
3、输出:新增会员、活跃会员、留存会员
4、日志文件、ODS、DWD、DWS、ADS(输出)
下一步作什么?
数据采集:日志文件 => Flume => HDFS => ODS
日志数据采集
-
原始日志数据(一条启动日志)
1
2020-07-30 14:18:47.339 [main] INFO com.lagou.ecommerce.AppStart - {"app_active": {"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":159611188852 9},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","d evice_id":"1FB872- 9A1009","os_type":"4.7.3","channel":"DK","language":"chinese"," brand":"iphone-9"}}
-
数据采集的流程
选择Flume作为采集日志数据的工具:
Flume 1.6, 无论是Spooling Directory Source、Exec Source均不能很好的满足动态实时收集的需求
Flume 1.8+, 提供了一个非常好用的 Taildir Source,使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集
taildir source配置
taildir Source的特点:
-
使用正则表达式匹配目录中的文件名
-
监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
-
高可靠,不会丢失数据
-
不会对跟踪文件有任何处理,不会重命名也不会删除
-
不支持Windows,不能读二进制文件。支持按行读取文本文件
taildir source配置:
1 | a1.sources.r1.type = TAILDIR |
-
positionFile
配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题
-
filegroups
指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)
-
filegroups
配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配
hdfs sink配置
1 | a1.sinks.k1.type = hdfs |
HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:
-
基于时间。hdfs.rollInterval 30秒
-
基于文件大小。hdfs.rollSize 1024字节
-
基于event数量。hdfs.rollCount 10个event
-
基于文件空闲时间。hdfs.idleTimeout 0 0,禁用
-
minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数量)才不会受影响
Agent的配置
创建/data/lagoudw/conf/flume-log2hdfs1.conf
1 | a1.sources = r1 |
Flume的优化配置
-
1、启动agent
1
flume-ng agent --conf-file /data/lagoudw/conf/flume- log2hdfs1.conf -name a1 -Dflum e.roog.logger=INFO,console
-
2、向 /data/lagoudw/logs/ 目录中放入日志文件,报错:
java.lang.OutOfMemoryError: GC overhead limit exceeded
缺省情况下 Flume jvm堆最大分配20m,这个值太小,需要调整。
-
3、解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容
1
2
3
4
5
6export JAVA_OPTS="-Xms4000m -Xmx4000m - Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 - Dflume.root.logger=INFO,console
flume-ng agent --conf-file /data/lagoudw/conf/flume- log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleFlume内存参数设置及优化:
-
根据日志数据量的大小,Jvm堆一般要设置为4G或更高
-
-Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响
存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳
-
自定义拦截器
前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。
要解决以上问题需要使用自定义拦截器。
agent用于测试自定义拦截器。netcat source =>logger sink
创建/data/lagoudw/conf/flumetest1.conf
1 | # a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1 |
自定义拦截器的原理:
-
1、自定义拦截器要集成Flume 的 Interceptor
-
2、Event 分为header 和 body(接收的字符串)
-
3、获取header和body
-
4、从body中获取"time":1596382570539,并将时间戳转换为字符串 “yyyy-MM-dd”
-
5、将转换后的字符串放置header中
自定义拦截器的实现:
-
1、获取 event 的 header
-
2、获取 event 的 body
-
3、解析body获取json串
-
4、解析json串获取时间戳
-
5、将时间戳转换为字符串 “yyyy-MM-dd”
-
6、将转换后的字符串放置header中 7、返回event
1 | <properties> |
1 | package cn.lagou.dw.flume.interceptor; |
将程序打包,放在 flume/lib目录下;
启动Agent测试
1 | flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flumetest1.conf -name a1 - Dflume.roog.logger=INFO,console |
备注:agent、telnet程序在同一节点
采集启动日志和事件日志
本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。要想一次拿到全部日志需要监控多个目录。
-
总体思路
1、taildir监控多个目录
2、修改自定义拦截器,不同来源的数据加上不同标志
3、hdfs sink 根据标志写文件
-
Agent配置
创建/data/lagoudw/conf/flume-log2hdfs3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1-
filegroups
指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)
-
headers.<filegroupName>.<headerKey>
给 event 增加header key。不同的filegroup,可配置不同的value
-
-
自定义拦截器
编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138package cn.lagou.dw.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
public void initialize() {
}
// 逐条处理event
public Event intercept(Event event) {
// 获取 event 的 body
String eventBody = new String(event.getBody(), Charsets.UTF_8);
// 获取 event 的 header
Map<String, String> headersMap = event.getHeaders();
// 解析body获取json串
String[] bodyArr = eventBody.split("\\s+");
try{
String jsonStr = bodyArr[6];
// 解析json串获取时间戳
String timestampStr = "";
JSONObject jsonObject = JSON.parseObject(jsonStr);
if (headersMap.getOrDefault("logtype", "").equals("start")){
// 取启动日志的时间戳
timestampStr = jsonObject.getJSONObject("app_active").getString("time");
} else if (headersMap.getOrDefault("logtype", "").equals("event")) {
// 取事件日志第一条记录的时间戳
JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");
if (jsonArray.size() > 0){
timestampStr = jsonArray.getJSONObject(0).getString("time");
}
}
// 将时间戳转换为字符串 "yyyy-MM-dd"
// 将字符串转换为Long
long timestamp = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// 将转换后的字符串放置header中
headersMap.put("logtime", date);
event.setHeaders(headersMap);
}catch (Exception e){
headersMap.put("logtime", "Unknown");
event.setHeaders(headersMap);
}
return event;
}
public List<Event> intercept(List<Event> events) {
List<Event> lstEvent = new ArrayList<>();
for (Event event: events){
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LogTypeInterceptor();
}
public void configure(Context context) {
}
}
public void startJunit(){
String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\": {\"name\":\"app_active\",\"json\": {\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"tim e\":1596342840284},\"attr\":{\"area\":\"大庆 \",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\": \"common\",\"device_id\":\"1FB872- 9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\": \"chinese\",\"brand\":\"iphone-8\"}}";
Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
map.put("logtype", "start");
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));
// 调用interceptor处理event
LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
Event outEvent = customerInterceptor.intercept(event);
// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
public void eventJunit(){
String str = "2020-08-02 18:20:11.877 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\": [{\"name\":\"goods_detail_loading\",\"json\": {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\" action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time \":1596343881690},{\"name\":\"loading\",\"json\": {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\" 3\",\"type\":\"1\"},\"time\":1596356988428}, {\"name\":\"notification\",\"json\": {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278}, {\"name\":\"favorites\",\"json\": {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":159635093396 2}],\"attr\":{\"area\":\"长治 \",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\": \"common\",\"device_id\":\"1FB872- 9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\ ":\"chinese\",\"brand\":\"xiaomi-0\"}}";
Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
map.put("logtype", "event");
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));
// 调用interceptor处理event
LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
Event outEvent = customerInterceptor.intercept(event);
// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
}测试
启动Agent,拷贝日志,检查HDFS文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# 清理环境
rm -f /data/lagoudw/conf/startlog_position.json
rm -f /data/lagoudw/logs/start/*.log
rm -f /data/lagoudw/logs/event/*.log
# 启动 Agent
flume-ng agent --conf /opt/apps/flume-1.9.0/conf --conf-file /opt/lagou/servers/flume-1.9.0/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,console
# 拷贝日志
cd /data/lagoudw/logs/source
cp event0802.log ../event/
cp start0802.log ../start/
# 检查HDFS文件
hdfs dfs -ls /user/data/logs/event
hdfs dfs -ls /user/data/logs/start
# 生产环境中用以下方式启动Agent
nohup flume-ng agent --conf /opt/apps/flume-1.9.0/conf --conf-file /opt/lagou/servers/flume-1.9.0/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &-
nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程
-
/dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
-
标准输入0,从键盘获得输入 /proc/self/fd/0
-
标准输出1,输出到屏幕(控制台) /proc/self/fd/1
-
错误输出2,输出到屏幕(控制台) /proc/self/fd/2
-
/dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
-
2>&1 错误输出将会和标准输出输出到同一个地方
-
/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中
-
日志数据采集小结
-
使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header
-
在每个目录中可以使用正则匹配多个文件
-
使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中
-
hdfs sink使用event header中的信息写数据(控制写文件的位置)
-
hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)
-
调节flume jvm内存的分配