RDD编程之分区与分区器
RDD的分区
spark.default.parallelism:(默认的并发数)= 2
当配置文件spark-default.conf中没有显示的配置,则按照如下规则取值:
-
1、本地模式
1
2spark-shell --master local[N] spark.default.parallelism = N
spark-shell --master local spark.default.parallelism = 1 -
2、伪分布式(x为本机上启动的executor数,y为每个executor使用的core数,z为每个 executor使用的内存)
1
spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y
-
3、分布式模式(yarn & standalone)
1
spark.default.parallelism = max(应用程序持有executor的core总数, 2)
备注:total number of cores on all executor nodes or 2, whichever is larger
经过上面的规则,就能确定了spark.default.parallelism的默认值(配置文件spark-default.conf中没有显示的配置。如果配置了,则spark.default.parallelism = 配置的值)
SparkContext初始化时,同时会生成两个参数,由上面得到的spark.default.parallelism推导出这两个参数的值
1 | // 从集合中创建RDD的分区数 |
以上参数确定后,就可以计算 RDD 的分区数了。
创建 RDD 的几种方式:
-
1、通过集合创建
1
2
3// 如果创建RDD时没有指定分区数,则rdd的分区数 = sc.defaultParallelism
val rdd = sc.parallelize(1 to 100)
rdd.getNumPartitions备注:简单的说RDD分区数等于cores总数
-
2、通过textFile创建
1
2val rdd = sc.textFile("data/start0721.big.log")
rdd.getNumPartitions如果没有指定分区数:
-
本地文件。rdd的分区数 = max(本地文件分片数, sc.defaultMinPartitions)
-
HDFS文件。 rdd的分区数 = max(hdfs文件 block 数, sc.defaultMinPartitions)
备注:
-
本地文件分片数 = 本地文件大小 / 32M
-
如果读取的是HDFS文件,同时指定的分区数 < hdfs文件的block数,指定的数不生效。
-
RDD分区器
以下RDD分别是否有分区器,是什么类型的分区器
1 | scala> val rdd1 = sc.textFile("/wcinput/wc.txt") |
只有Key-Value类型的RDD才可能有分区器,Value类型的RDD分区器的值是None。
分区器的作用及分类:
在 PairRDD(key,value) 中,很多操作都是基于key的,系统会按照key对数据进行重组,如groupbykey;
数据重组需要规则,最常见的就是基于 Hash 的分区,此外还有一种复杂的基于抽样 Range 分区方法;
HashPartitioner:最简单、最常用,也是默认提供的分区器。对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用 余数+分区的个数,最后返回的值就是这个key所属的分区ID。该分区方法可以保证key相同的数据出现在同一个分区中。
用户可通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量。
1 | scala> val rdd1 = sc.makeRDD(1 to 100).map((_, 1)) |
Spark的很多算子都可以设置 HashPartitioner 的值:
RangePartitioner:简单的说就是将一定范围内的数映射到某一个分区内。在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。
现在的问题:在执行分区之前其实并不知道数据的分布情况,如果想知道数据分区就需要对数据进行采样;
Spark中RangePartitioner在对数据采样的过程中使用了水塘采样算法。
水塘采样:从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况;
在采样的过程中执行了collect()操作,引发了Action操作。
自定义分区器:Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。
实现自定义分区器按以下规则分区:
-
分区0 < 100
-
100 <= 分区1 < 200
-
200 <= 分区2 < 300
-
300 <= 分区3 < 400
-
… …
-
900 <= 分区9 < 1000
1 | package cn.lagou.sparkcore |