需求分析

会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。

电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性,需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。

  • 计算指标:

    新增会员:每日新增会员数

    活跃会员:每日,每周,每月的活跃会员数

    会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率

  • 指标口径业务逻辑:

    会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID 来标识一个独立会员,每部移动设备是一个会员;

    活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备每天多次打开计算为一个活跃会员。在自然周内启动过应用的会员为周活跃会员,同理还有月活跃会员;

    会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自然周)、月活跃率(自然月);

    新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然月)新增会员;

    留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。

已知条件:

1、明确了需求

2、输入:启动日志(OK)、事件日志

3、输出:新增会员、活跃会员、留存会员

4、日志文件、ODS、DWD、DWS、ADS(输出)

下一步作什么?

数据采集:日志文件 => Flume => HDFS => ODS

日志数据采集

  • 原始日志数据(一条启动日志)

    1
    2020-07-30 14:18:47.339 [main] INFO com.lagou.ecommerce.AppStart - {"app_active": {"name":"app_active","json": {"entry":"1","action":"1","error_code":"0"},"time":159611188852 9},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","d evice_id":"1FB872- 9A1009","os_type":"4.7.3","channel":"DK","language":"chinese"," brand":"iphone-9"}}
  • 数据采集的流程


    选择Flume作为采集日志数据的工具:

    Flume 1.6, 无论是Spooling Directory Source、Exec Source均不能很好的满足动态实时收集的需求

    Flume 1.8+, 提供了一个非常好用的 Taildir Source,使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集

taildir source配置

taildir Source的特点:

  • 使用正则表达式匹配目录中的文件名

  • 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink

  • 高可靠,不会丢失数据

  • 不会对跟踪文件有任何处理,不会重命名也不会删除

  • 不支持Windows,不能读二进制文件。支持按行读取文本文件

taildir source配置:

1
2
3
4
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
  • positionFile

    配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题

  • filegroups

    指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

  • filegroups

    配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

hdfs sink配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 100

# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true

HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:

  • 基于时间。hdfs.rollInterval 30秒

  • 基于文件大小。hdfs.rollSize 1024字节

  • 基于event数量。hdfs.rollCount 10个event

  • 基于文件空闲时间。hdfs.idleTimeout 0 0,禁用

  • minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数量)才不会受影响

Agent的配置

创建/data/lagoudw/conf/flume-log2hdfs1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000

# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume的优化配置

  • 1、启动agent

    1
    flume-ng agent --conf-file /data/lagoudw/conf/flume- log2hdfs1.conf -name a1 -Dflum e.roog.logger=INFO,console
  • 2、向 /data/lagoudw/logs/ 目录中放入日志文件,报错:

    java.lang.OutOfMemoryError: GC overhead limit exceeded


    缺省情况下 Flume jvm堆最大分配20m,这个值太小,需要调整。

  • 3、解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容

    1
    2
    3
    4
    5
    6
    export JAVA_OPTS="-Xms4000m -Xmx4000m - Dcom.sun.management.jmxremote"

    # 要想使配置文件生效,还要在命令行中指定配置文件目录
    flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1 - Dflume.root.logger=INFO,console

    flume-ng agent --conf-file /data/lagoudw/conf/flume- log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

    Flume内存参数设置及优化:

    • 根据日志数据量的大小,Jvm堆一般要设置为4G或更高

    • -Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响

    存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳

自定义拦截器

前面 Flume Agent 的配置使用了本地时间,可能导致数据存放的路径不正确。

要解决以上问题需要使用自定义拦截器。

agent用于测试自定义拦截器。netcat source =>logger sink

创建/data/lagoudw/conf/flumetest1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop2
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.CustomerInterceptor$Builder

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = logger

# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器的原理:

  • 1、自定义拦截器要集成Flume 的 Interceptor

  • 2、Event 分为header 和 body(接收的字符串)

  • 3、获取header和body

  • 4、从body中获取"time":1596382570539,并将时间戳转换为字符串 “yyyy-MM-dd”

  • 5、将转换后的字符串放置header中

自定义拦截器的实现:

  • 1、获取 event 的 header

  • 2、获取 event 的 body

  • 3、解析body获取json串

  • 4、解析json串获取时间戳

  • 5、将时间戳转换为字符串 “yyyy-MM-dd”

  • 6、将转换后的字符串放置header中 7、返回event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.23</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package cn.lagou.dw.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
// 逐条处理event
public Event intercept(Event event) {
// 获取 event 的 body
String eventBody = new String(event.getBody(), Charsets.UTF_8);

// 获取 event 的 header
Map<String, String> headersMap = event.getHeaders();

// 解析body获取json串
String[] bodyArr = eventBody.split("\\s+");

try{
String jsonStr = bodyArr[6];

// 解析json串获取时间戳
JSONObject jsonObject = JSON.parseObject(jsonStr);
String timestampStr = jsonObject.getJSONObject("app_active").getString("time");

// 将时间戳转换为字符串 "yyyy-MM-dd"
// 将字符串转换为Long
long timestamp = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);

// 将转换后的字符串放置header中
headersMap.put("logtime", date);
event.setHeaders(headersMap);
}catch (Exception e){
headersMap.put("logtime", "Unknown");
event.setHeaders(headersMap);
}
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> lstEvent = new ArrayList<>();
for (Event event: events){
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}

@Override public void configure(Context context) {

}
}

@Test public void testJunit(){
String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\": {\"name\":\"app_active\",\"json\": {\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"tim e\":1596342840284},\"attr\":{\"area\":\"大庆 \",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\": \"common\",\"device_id\":\"1FB872- 9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\": \"chinese\",\"brand\":\"iphone-8\"}}";

Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));

// 调用interceptor处理event
CustomerInterceptor customerInterceptor = new CustomerInterceptor();
Event outEvent = customerInterceptor.intercept(event);

// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
}

将程序打包,放在 flume/lib目录下;

启动Agent测试

1
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flumetest1.conf -name a1 - Dflume.roog.logger=INFO,console

备注:agent、telnet程序在同一节点

采集启动日志和事件日志

本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。要想一次拿到全部日志需要监控多个目录。


  • 总体思路

    1、taildir监控多个目录

    2、修改自定义拦截器,不同来源的数据加上不同标志

    3、hdfs sink 根据标志写文件

  • Agent配置

    创建/data/lagoudw/conf/flume-log2hdfs3.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # taildir source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
    a1.sources.r1.headers.f1.logtype = start
    a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log
    a1.sources.r1.headers.f2.logtype = event

    # 自定义拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = cn.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder

    # memorychannel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 2000

    # hdfs sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
    a1.sinks.k1.hdfs.filePrefix = startlog.
    a1.sinks.k1.hdfs.fileType = DataStream

    # 配置文件滚动方式(文件大小32M)
    a1.sinks.k1.hdfs.rollSize = 33554432
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.idleTimeout = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1

    # 向hdfs上刷新的event的个数
    a1.sinks.k1.hdfs.batchSize = 1000

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    • filegroups

      指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)

    • headers.<filegroupName>.<headerKey>

      给 event 增加header key。不同的filegroup,可配置不同的value

  • 自定义拦截器

    编码完成后打包上传服务器,放置在$FLUME_HOME/lib 下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    package cn.lagou.dw.flume.interceptor;

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.compress.utils.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.interceptor.Interceptor;
    import org.junit.Test;

    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.time.format.DateTimeFormatter;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    // 逐条处理event
    public Event intercept(Event event) {
    // 获取 event 的 body
    String eventBody = new String(event.getBody(), Charsets.UTF_8);

    // 获取 event 的 header
    Map<String, String> headersMap = event.getHeaders();

    // 解析body获取json串
    String[] bodyArr = eventBody.split("\\s+");
    try{
    String jsonStr = bodyArr[6];
    // 解析json串获取时间戳
    String timestampStr = "";
    JSONObject jsonObject = JSON.parseObject(jsonStr);
    if (headersMap.getOrDefault("logtype", "").equals("start")){
    // 取启动日志的时间戳
    timestampStr = jsonObject.getJSONObject("app_active").getString("time");
    } else if (headersMap.getOrDefault("logtype", "").equals("event")) {
    // 取事件日志第一条记录的时间戳
    JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");
    if (jsonArray.size() > 0){
    timestampStr = jsonArray.getJSONObject(0).getString("time");
    }
    }

    // 将时间戳转换为字符串 "yyyy-MM-dd"
    // 将字符串转换为Long
    long timestamp = Long.parseLong(timestampStr);
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    Instant instant = Instant.ofEpochMilli(timestamp);
    LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
    String date = formatter.format(localDateTime);
    // 将转换后的字符串放置header中
    headersMap.put("logtime", date);
    event.setHeaders(headersMap);
    }catch (Exception e){
    headersMap.put("logtime", "Unknown");
    event.setHeaders(headersMap);
    }
    return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
    List<Event> lstEvent = new ArrayList<>();
    for (Event event: events){
    Event outEvent = intercept(event);
    if (outEvent != null) {
    lstEvent.add(outEvent);
    }
    }
    return lstEvent;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
    @Override
    public Interceptor build() {
    return new LogTypeInterceptor();
    }

    @Override
    public void configure(Context context) {

    }
    }

    @Test
    public void startJunit(){
    String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\": {\"name\":\"app_active\",\"json\": {\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"tim e\":1596342840284},\"attr\":{\"area\":\"大庆 \",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\": \"common\",\"device_id\":\"1FB872- 9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\": \"chinese\",\"brand\":\"iphone-8\"}}";

    Map<String, String> map = new HashMap<>();
    // new Event
    Event event = new SimpleEvent();
    map.put("logtype", "start");
    event.setHeaders(map);
    event.setBody(str.getBytes(Charsets.UTF_8));

    // 调用interceptor处理event
    LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
    Event outEvent = customerInterceptor.intercept(event);
    // 处理结果
    Map<String, String> headersMap = outEvent.getHeaders();
    System.out.println(JSON.toJSONString(headersMap));
    }

    @Test
    public void eventJunit(){
    String str = "2020-08-02 18:20:11.877 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\": [{\"name\":\"goods_detail_loading\",\"json\": {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\" action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time \":1596343881690},{\"name\":\"loading\",\"json\": {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\" 3\",\"type\":\"1\"},\"time\":1596356988428}, {\"name\":\"notification\",\"json\": {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278}, {\"name\":\"favorites\",\"json\": {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":159635093396 2}],\"attr\":{\"area\":\"长治 \",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\": \"common\",\"device_id\":\"1FB872- 9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\ ":\"chinese\",\"brand\":\"xiaomi-0\"}}";

    Map<String, String> map = new HashMap<>();
    // new Event
    Event event = new SimpleEvent();
    map.put("logtype", "event");
    event.setHeaders(map);
    event.setBody(str.getBytes(Charsets.UTF_8));

    // 调用interceptor处理event
    LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
    Event outEvent = customerInterceptor.intercept(event);

    // 处理结果
    Map<String, String> headersMap = outEvent.getHeaders();
    System.out.println(JSON.toJSONString(headersMap));
    }
    }

    测试

    启动Agent,拷贝日志,检查HDFS文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # 清理环境
    rm -f /data/lagoudw/conf/startlog_position.json
    rm -f /data/lagoudw/logs/start/*.log
    rm -f /data/lagoudw/logs/event/*.log

    # 启动 Agent
    flume-ng agent --conf /opt/apps/flume-1.9.0/conf --conf-file /opt/lagou/servers/flume-1.9.0/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,console

    # 拷贝日志
    cd /data/lagoudw/logs/source
    cp event0802.log ../event/
    cp start0802.log ../start/

    # 检查HDFS文件
    hdfs dfs -ls /user/data/logs/event
    hdfs dfs -ls /user/data/logs/start

    # 生产环境中用以下方式启动Agent
    nohup flume-ng agent --conf /opt/apps/flume-1.9.0/conf --conf-file /opt/lagou/servers/flume-1.9.0/conf/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &
    • nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程

    • /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞

    • 标准输入0,从键盘获得输入 /proc/self/fd/0

    • 标准输出1,输出到屏幕(控制台) /proc/self/fd/1

    • 错误输出2,输出到屏幕(控制台) /proc/self/fd/2

    • /dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容

    • 2>&1 错误输出将会和标准输出输出到同一个地方

    • /dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

日志数据采集小结

  • 使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同header

  • 在每个目录中可以使用正则匹配多个文件

  • 使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中

  • hdfs sink使用event header中的信息写数据(控制写文件的位置)

  • hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)

  • 调节flume jvm内存的分配