Impala的组件

Impala是⼀个分布式,⼤规模并⾏处理(MPP)数据库引擎,它包括多个进程。Impala与Hive类似不是数据库⽽是数据分析⼯具;

1
2
3
4
5
6
7
8
9
#在linux123执⾏
ps -ef | grep impala

#结果
impala 29212 1 0 Jul02 ? 00:01:06
/usr/lib/impala/sbin/statestored -log_dir=/var/log/impala -state_store_port=24000
impala 29249 1 0 Jul02 ? 00:00:49 /usr/lib/impala/sbin/catalogd -log_dir=/var/log/impala
impala 29341 1 0 Jul02 ? 00:00:49 /usr/lib/impala/sbin/impalad -log_dir=/var/log/impala -catalog_service_host=linux123 -
state_store_port=24000 -use_statestore -state_store_host=linux123 -be_port=22000

impalad

  • ⻆⾊名称为Impala Daemon,是在每个节点上运⾏的进程,是Impala的核⼼组件,进程名是Impalad;

  • 作⽤,负责读写数据⽂件,接收来⾃Impala-shell,JDBC,ODBC等的查询请求,与集群其它Impalad分布式并⾏完成查询任务,并将查询结果返回给中⼼协调者。

  • 为了保证Impalad进程了解其它Impalad的健康状况,Impalad进程会⼀直与statestore保持通信。

  • Impalad服务由三个模块组成:Query Planner、Query Coordinator和Query Executor,前两个模块组成前端,负责接收SQL查询请求,解析SQL并转换成执⾏计划,交由后端执⾏,

statestored

  • statestore监控集群中Impalad的健康状况,并将集群健康信息同步给Impalad

  • statestore进程名为statestored

catalogd

  • Impala执⾏的SQL语句引发元数据发⽣变化时,catalog服务负责把这些元数据的变化同步给其它Impalad进程(⽇志验证,监控statestore进程⽇志)

  • catalog服务对应进程名称是catalogd

  • 由于⼀个集群需要⼀个catalogd以及⼀个statestored进程,⽽且catalogd进程所有请求都是经过statestored进程发送,所以官⽅建议让statestored进程与catalogd进程安排同个节点。

Impala的查询


  1. Client提交任务

Client发送⼀个SQL查询请求到任意⼀个Impalad节点,会返回⼀个queryId⽤于之后的客户端操作。

  1. ⽣成单机和分布式执⾏计划

SQL提交到Impalad节点之后,Analyser依次执⾏SQL的词法分析、语法分析、语义分析等操作;从MySQL元数据库中获取元数据,从HDFS的名称节点中获取数据地址,以得到存储这个查询相关数据的所有数据节点

  • 单机执⾏计划: 根据上⼀步对SQL语句的分析,由Planner先⽣成单机的执⾏计划,该执⾏计划是有PlanNode组成的⼀棵树,这个过程中也会执⾏⼀些SQL优化,例如Join顺序改变、谓词下推等。

  • 分布式并⾏物理计划:将单机执⾏计划转换成分布式并⾏物理执⾏计划,物理执⾏计划由⼀个个的Fragment组成,Fragment之间有数据依赖关系,处理过程中需要在原有的执⾏计划之上加⼊⼀些ExchangeNode和DataStreamSink信息等。

    • Fragment : sql⽣成的分布式执⾏计划的⼀个⼦任务;

    • DataStreamSink:传输当前的Fragment输出数据到不同的节点

  1. 任务调度和分发

Coordinator将Fragment(⼦任务)根据数据分区信息发配到不同的Impalad节点上执⾏。Impalad节点接收到执⾏Fragment请求交由Executor执⾏。

  1. Fragment之间的数据依赖

每⼀个Fragment的执⾏输出通过DataStreamSink发送到下⼀个Fragment,Fragment运⾏过程中不断向coordinator节点汇报当前运⾏状态。

  1. 结果汇总

查询的SQL通常情况下需要有⼀个单独的Fragment⽤于结果的汇总,它只在Coordinator节点运⾏,将多个节点的最终执⾏结果汇总,转换成ResultSet信息。

  1. 获取结果

客户端调⽤获取ResultSet的接⼝,读取查询结果。

查询计划示例

1
2
3
4
5
6
7
8
9
10
select
t1.n1,
t2.n2,
count(1) as c
from t1 join t2 on t1.id = t2.id
join t3 on t1.id = t3.id
where t3.n3 between ‘a’ and ‘f’
group by t1.n1, t2.n2
order by c desc
limit 100;

单机执⾏计划


分析上⾯的单机执⾏计划,第⼀步先去扫描t1表中需要的数据,如果数据⽂件存储是列式存储我们可以便利的扫描到所需的列id,n1;接着需要与t2表进⾏Join操作,扫描t2表与t1表类似获取到所需数据列id,n2;t1与t2表进⾏关联,关联之后再与t3表进⾏关联,这⾥Impala会使⽤谓词下推扫描t3表只取join所需数据;对group by进⾏相应的aggregation操作,最终是排序取出指定数量的数据返回。

分布式并⾏执⾏计划

所谓的分布式并⾏化执⾏计划就是在单机执⾏计划基础之上结合数据分布式存储的特点,按照任务的计算要求把单机执⾏计划拆分为多段⼦任务,每个⼦任务都是可以并⾏执⾏的。上⾯的单机执⾏计划转为分布式并⾏执⾏计划如下图所示:



分布式执⾏计划中涉及到多表的Join,Impala会根据表的⼤⼩来决定Join的⽅式,主要有两种分别是Hash Join与Broadcast Join

上⾯分布式执⾏计划中可以看出T1,T2表⼤⼀些,⽽T3表⼩⼀些,所以对于T1与T2的Join Impala选择使⽤Hash Join,对于T3表选择使⽤Broadcast ⽅式,直接把T3表⼴播到需要Join的节点上。

分布式并⾏计划流程

  1. T1和T2使⽤Hash join,此时需要按照id的值分别将T1和T2分散到不同的Impalad进程,但是相同的id会散列到相同的Impalad进程,这样每⼀个Join之后是全部数据的⼀部分。

  2. T1与T2Join之后的结果数据再与T3表进⾏Join,此时T3表采⽤Broadcast⽅式把⾃⼰全部数据(id列)⼴播到需要的Impala节点上。

  3. T1,T2,T3Join之后再根据Group by执⾏本地的预聚合,每⼀个节点的预聚合结果只是最终结果的⼀部分(不同的节点可能存在相同的group by的值),需要再进⾏⼀次全局的聚合。

  4. 全局的聚合同样需要并⾏,则根据聚合列进⾏Hash分散到不同的节点执⾏Merge运算(其实仍然是⼀次聚合运算),⼀般情况下为了较少数据的⽹络传输, Impala会选择之前本地聚合节点做全局聚合⼯作。

  5. 通过全局聚合之后,相同的key只存在于⼀个节点,然后对于每⼀个节点进⾏排序和TopN计算,最终将每⼀个全局聚合节点的结果返回给Coordinator进⾏合并、排序、limit计算,返回结果给⽤户。