会员活跃度分析之日志数据采集
需求分析
会员数据是后期营销的很重要的数据。网店会专门针对会员进行一系列营销活动。
电商会员一般门槛较低,注册网站即可加入。有些电商平台的高级会员具有时效性,需要购买VIP会员卡或一年内消费额达到多少才能成为高级会员。
计算指标:
新增会员:每日新增会员数
活跃会员:每日,每周,每月的活跃会员数
会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率
指标口径业务逻辑:
会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID 来标识一个独立会员,每部移动设备是一个会员;
活跃会员:打开应用的会员即为活跃会员 ...
电商离线数仓设计
需求分析
近年来,中国的电子商务快速发展,交易额连创新高,电子商务在各领域的应用不断拓展和深化、相关服务业蓬勃发展、支撑体系不断健全完善、创新的动力和能力不断增强。电子商务正在与实体经济深度融合,进入规模性发展阶段,对经济社会生活的影响不断增大,正成为我国经济发展的新引擎。
中国电子商务研究中心数据显示,截止到 2012 年底,中国电子商务市场交易规模达7.85万亿人民币,同比增长 30.83%。其中,B2B 电子商务交易额 达 6.25 万亿,同比增长 27%。而 2011 年全年,中国电子商务市场交易额达 6 万亿人民币,同比增长 33%,占 GDP 比重上升到 13%;2012 年,电子 ...
数据仓库理论
数据仓库
什么是数据仓库?
1988年,为解决全企业集成问题,IBM公司第一次提出了信息仓库(Information Warehouse)的概念。数据仓库的基本原理、技术架构以及分析系统的主要原则都已确定,数据仓库初具雏形。
1991年Bill Inmon(比尔·恩门)出版了他的第一本关于数据仓库的书《Building the Data Warehouse》,标志着数据仓库概念的确立。书中指出,数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化的(Time Varia ...
Kafka源码之异步发送模式
异步发送消息
在发送消息的时候设置回调函数:
调⽤KafkaProducer的send⽅法,该⽅法接收要发送的消息批,同时接收回调对象:
doSend的实现:
累加器append的实现:
tryAppend的实现:
Sender的run⽅法调⽤:
sendProducerData的实现:
sendProducerRequests的实现:
上述⽅法如果得到broker的响应,就回调 handleProduceResponse ⽅法:
该⽅法对响应的处理:
completeBatch的实现:
completeBatch的实 ...
Kafka源码之同步发送模式
消息同步发送的代码:
所谓同步,就是调⽤Future的get⽅法同步等待。
send⽅法是异步的:
send⽅法将消息发送给broker,当前线程同步等待broker返回的消息。
send发的实现:
看doSend:
该⽅法⾸先将消息放到累加器中,判断是否需要发起请求,如果需要,则唤醒sender线程发送消息。
该⽅法的返回值:RecordApendResult.future:
RecordApendResult类:
累加器的append⽅法将消息追加到累加器,并返回追加到累加器的结果:
其中主要实现:
tryAppend的实现:
上述⽅法的返回 ...
Kafka源码之组消费模式
组消费模式指的是在消费者消费消息的时候,使⽤组协调器的再平衡机制⾃动分配要消费的分区(们)。
此时需要在消费者的配置中指定消费组ID,同时如果需要,设置偏移量重置的策略。
然后消费者订阅主题,就可以消费消息了。
123456789101112131415161718192021Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig ...
Kafka源码之分区消费模式
在分区消费模式,需要⼿动指定消费者要消费的主题和主题的分区信息。
可以设置从分区的哪个偏移量开始消费。
典型的分区消费:
12345678910111213141516171819202122232425262728293031Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_C ...
Kafka源码之DynamicConfigManager
⼯作流程如下:
配置存储于/config/entityType/entityName,如/config/topics/<topic_name>以及/config/clients/,默认配置存储与各⾃的节点中,上述节点中保存的是覆盖默认配置的数据,以properties的格式。
可以使⽤分级路径同时指定多个实体的名称,如:/config/users//clients/
设置通知路径/config/changes,避免对所有主题进⾏监控,有事通知。DynamicConfigManager监控该路径。
更新配置的第⼀步是更新配置的properties。
之后,在/config/chang ...
Kafka源码之KafkaHealthcheck
健康检查的初始化和启动:
在启动KafkaServer的startup⽅法中,实例化并启动了健康检查:
健康检查的startup⽅法的执⾏逻辑:
注册状态监听器的具体实现:
subscribeStateChanges(listener)具体实现:调⽤zookeeper客户端的⽅法,该⽅法将监听器对象添加到_stateListener这个Set集合中。
zookeeper客户端的回调⽅法:新建会话事件触发监听器。
如果发⽣了zk重连,则需要重新在zk中注册当前borker。
会话建⽴异常,触发监听器:
⽆法建⽴到zk的连接:
状态改变,触发执⾏监听器⽅ ...
Kafka源码之KafkaController
当前broker被选为新的controller的时候,执⾏如下操作:
注册controller epoch事件监听
controller epoch +1
初始化controller上下⽂对象,该上下⽂对象缓存当前所有主题、活跃broker以及所有分区leader的信息
启动controller channel manager
启动副本状态机
启动分区状态机
如果注册为controller的过程中发⽣了异常,重新注册当前broker为controller,如此则触发新⼀轮controller选举,以保证永远有⼀个活跃的controller。
启动Kafka服务器的脚 ...