ODS:分区;事件的主要信息在json串中(json数组),公共信息在另外一个json串中;
ODS => 解析json,从json串中,提取jsonArray数据;将公共信息从json串中解析出来 => 所有事件的明细
所有事件的明细,包括:
所有事件的明细 => 广告json串解析 => 广告事件的明细
广告事件的明细:
DWD层建表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 -- 所有事件明细 drop table if exists dwd.dwd_event_log; CREATE EXTERNAL TABLE dwd.dwd_event_log( `device_id` string, `uid` string, `app_v` string, `os_type` string, `event_type` string, `language` string, `channel` string, `area` string, `brand` string, `name` string, `event_json` string, `report_time` string ) PARTITIONED BY (`dt` string) stored as parquet; -- 与广告点击明细 drop table if exists dwd.dwd_ad; CREATE TABLE dwd.dwd_ad( `device_id` string, `uid` string, `app_v` string, `os_type` string, `event_type` string, `language` string, `channel` string, `area` string, `brand` string, `report_time` string, `duration` int, `ad_action` int, `shop_id` int, `ad_type` int, `show_style` smallint, `product_id` int, `place` string, `sort` int, `hour` string ) PARTITIONED BY (`dt` string) stored as parquet;
事件json串解析
内建函数、UDF、SerDe(json是所有的信息)
详细内容参见会员活跃度分析之ODS建表和数据加载博客的json数据处理 => 使用UDF(处理jsonArray)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.lagou.dw.hive.udf; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.hadoop.hive.ql.exec.UDF; import org.junit.Test; import java.util.ArrayList; public class ParseJsonArray extends UDF { public ArrayList<String> evaluate(String jsonStr) { // 传入空字符串,返回null if (Strings.isNullOrEmpty(jsonStr)) { return null; } try { // 获取jsonArray JSONArray jsonArray = JSON.parseArray(jsonStr); ArrayList<String> lst = new ArrayList<>(); for (Object o : jsonArray) { lst.add(o.toString()); } return lst; } catch (JSONException e) { return null; } } @Test public void JunitParseJsonArray() { String jsonStr = " [{\"name\":\"goods_detail_loading\",\"json\": {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"a ction\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\" :1596343881690},{\"name\":\"loading\",\"json\": {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3 \",\"type\":\"1\"},\"time\":1596356988428}, {\"name\":\"notification\",\"json\": {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278}, {\"name\":\"favorites\",\"json\": {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962 }]"; ArrayList<String> result = evaluate(jsonStr); System.out.println(result.size()); System.out.println(JSON.toJSONString(result)); } }
DWD层数据加载
主要功能:解析json串;得到全部的事件日志
创建/data/lagoudw/script/advertisement/dwd_load_event_log.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 cd /data/script/advertisement vim dwd_load_event_log.sh #!/bin/bash source /etc/profile if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" use dwd; add jar /data/lagoudw/jars/dw-event-log-1.0-SNAPSHOT-jar-with-dependencies.jar; create temporary function json_array as 'cn.lagou.dw.hive.udf.ParseJsonArray'; with tmp_start as ( select split(str, ' ')[7] as line from ods.ods_log_event where dt='$do_date' ) insert overwrite table dwd.dwd_event_log PARTITION (dt='$do_date') select device_id, uid, app_v, os_type, event_type, language, channel, area, brand, get_json_object(k,'$.name') as name, get_json_object(k,'$.json') as json, get_json_object(k,'$.time') as time from ( select get_json_object(line,'$.attr.device_id') as device_id, get_json_object(line,'$.attr.uid') as uid, get_json_object(line,'$.attr.app_v') as app_v, get_json_object(line,'$.attr.os_type') as os_type, get_json_object(line,'$.attr.event_type') as event_type, get_json_object(line,'$.attr.language') as language, get_json_object(line,'$.attr.channel') as channel, get_json_object(line,'$.attr.area') as area, get_json_object(line,'$.attr.brand') as brand, get_json_object(line,'$.lagou_event') as lagou_event from tmp_start ) A lateral view explode(json_array(lagou_event)) B as k; " hive -e "$sql"
1 sh dwd_load_event_log.sh 2020-07-20
从全部的事件日志中获取广告点击事件:创建/data/lagoudw/script/advertisement/dwd_load_ad_log.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 cd /data/script/advertisement vim dwd_load_ad_log.sh #!/bin/bash source /etc/profile if [ -n "$1" ] then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" insert overwrite table dwd.dwd_ad PARTITION (dt='$do_date') select device_id, uid, app_v, os_type, event_type, language, channel, area, brand, report_time, get_json_object(event_json,'$.duration') , get_json_object(event_json,'$.ad_action') , get_json_object(event_json,'$.shop_id') , get_json_object(event_json,'$.ad_type'), get_json_object(event_json,'$.show_style'), get_json_object(event_json,'$.product_id'), get_json_object(event_json,'$.place'), get_json_object(event_json,'$.sort'), from_unixtime(ceil(report_time/1000), 'HH') from dwd.dwd_event_log where dt='$do_date' and name='ad'; " hive -e "$sql"
1 sh dwd_load_ad_log.sh 2020-07-20
日志 => Flume => ODS => 清洗、转换 => 广告事件详细信息