ODS:分区;事件的主要信息在json串中(json数组),公共信息在另外一个json串中;

ODS => 解析json,从json串中,提取jsonArray数据;将公共信息从json串中解析出来 => 所有事件的明细

所有事件的明细,包括:

  • 分区

  • 事件(json串)

  • 公共信息字段

所有事件的明细 => 广告json串解析 => 广告事件的明细

广告事件的明细:

  • 分区

  • 广告信息字段

  • 公共信息字段

DWD层建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
-- 所有事件明细
drop table if exists dwd.dwd_event_log;
CREATE EXTERNAL TABLE dwd.dwd_event_log(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`event_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`name` string,
`event_json` string,
`report_time` string
)
PARTITIONED BY (`dt` string)
stored as parquet;

-- 与广告点击明细
drop table if exists dwd.dwd_ad;
CREATE TABLE dwd.dwd_ad(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`event_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`report_time` string,
`duration` int,
`ad_action` int,
`shop_id` int,
`ad_type` int,
`show_style` smallint,
`product_id` int,
`place` string,
`sort` int,
`hour` string
)
PARTITIONED BY (`dt` string)
stored as parquet;

事件json串解析

内建函数、UDF、SerDe(json是所有的信息)

详细内容参见会员活跃度分析之ODS建表和数据加载博客的json数据处理 => 使用UDF(处理jsonArray)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.lagou.dw.hive.udf;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.junit.Test;
import java.util.ArrayList;

public class ParseJsonArray extends UDF {
public ArrayList<String> evaluate(String jsonStr) {
// 传入空字符串,返回null
if (Strings.isNullOrEmpty(jsonStr)) {
return null;
}
try {
// 获取jsonArray
JSONArray jsonArray = JSON.parseArray(jsonStr);
ArrayList<String> lst = new ArrayList<>();
for (Object o : jsonArray) {
lst.add(o.toString());
}
return lst;
} catch (JSONException e) {
return null;
}
}

@Test
public void JunitParseJsonArray() {
String jsonStr = " [{\"name\":\"goods_detail_loading\",\"json\": {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"a ction\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\" :1596343881690},{\"name\":\"loading\",\"json\": {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3 \",\"type\":\"1\"},\"time\":1596356988428}, {\"name\":\"notification\",\"json\": {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278}, {\"name\":\"favorites\",\"json\": {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962 }]";
ArrayList<String> result = evaluate(jsonStr);
System.out.println(result.size());
System.out.println(JSON.toJSONString(result));
}
}

DWD层数据加载

主要功能:解析json串;得到全部的事件日志

创建/data/lagoudw/script/advertisement/dwd_load_event_log.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
cd /data/script/advertisement

vim dwd_load_event_log.sh

#!/bin/bash
source /etc/profile

if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi

sql="
use dwd;

add jar /data/lagoudw/jars/dw-event-log-1.0-SNAPSHOT-jar-with-dependencies.jar;

create temporary function json_array as 'cn.lagou.dw.hive.udf.ParseJsonArray';

with tmp_start as (
select split(str, ' ')[7] as line
from ods.ods_log_event
where dt='$do_date'
)
insert overwrite table dwd.dwd_event_log PARTITION (dt='$do_date')
select device_id, uid, app_v, os_type, event_type, language, channel, area, brand,
get_json_object(k,'$.name') as name,
get_json_object(k,'$.json') as json,
get_json_object(k,'$.time') as time
from (
select
get_json_object(line,'$.attr.device_id') as device_id,
get_json_object(line,'$.attr.uid') as uid,
get_json_object(line,'$.attr.app_v') as app_v,
get_json_object(line,'$.attr.os_type') as os_type,
get_json_object(line,'$.attr.event_type') as event_type,
get_json_object(line,'$.attr.language') as language,
get_json_object(line,'$.attr.channel') as channel,
get_json_object(line,'$.attr.area') as area,
get_json_object(line,'$.attr.brand') as brand,
get_json_object(line,'$.lagou_event') as lagou_event
from tmp_start
) A
lateral view explode(json_array(lagou_event)) B as k;
"

hive -e "$sql"
1
sh dwd_load_event_log.sh 2020-07-20

从全部的事件日志中获取广告点击事件:创建/data/lagoudw/script/advertisement/dwd_load_ad_log.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
cd /data/script/advertisement

vim dwd_load_ad_log.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi

sql="
insert overwrite table dwd.dwd_ad PARTITION (dt='$do_date')
select device_id, uid, app_v, os_type, event_type, language, channel, area, brand, report_time,
get_json_object(event_json,'$.duration') ,
get_json_object(event_json,'$.ad_action') ,
get_json_object(event_json,'$.shop_id') ,
get_json_object(event_json,'$.ad_type'),
get_json_object(event_json,'$.show_style'),
get_json_object(event_json,'$.product_id'),
get_json_object(event_json,'$.place'),
get_json_object(event_json,'$.sort'),
from_unixtime(ceil(report_time/1000), 'HH')
from dwd.dwd_event_log
where dt='$do_date'
and name='ad';
"

hive -e "$sql"
1
sh dwd_load_ad_log.sh 2020-07-20

日志 => Flume => ODS => 清洗、转换 => 广告事件详细信息