RDD的分区

spark.default.parallelism:(默认的并发数)= 2

当配置文件spark-default.conf中没有显示的配置,则按照如下规则取值:

  • 1、本地模式

    1
    2
    spark-shell --master local[N] spark.default.parallelism = N
    spark-shell --master local spark.default.parallelism = 1
  • 2、伪分布式(x为本机上启动的executor数,y为每个executor使用的core数,z为每个 executor使用的内存)

    1
    spark-shell --master local-cluster[x,y,z]  spark.default.parallelism = x * y
  • 3、分布式模式(yarn & standalone)

    1
    spark.default.parallelism = max(应用程序持有executor的core总数, 2)

备注:total number of cores on all executor nodes or 2, whichever is larger

经过上面的规则,就能确定了spark.default.parallelism的默认值(配置文件spark-default.conf中没有显示的配置。如果配置了,则spark.default.parallelism = 配置的值)

SparkContext初始化时,同时会生成两个参数,由上面得到的spark.default.parallelism推导出这两个参数的值

1
2
3
4
5
// 从集合中创建RDD的分区数
sc.defaultParallelism = spark.default.parallelism

// 从文件中创建RDD的分区数
sc.defaultMinPartitions = min(spark.default.parallelism, 2)

以上参数确定后,就可以计算 RDD 的分区数了。

创建 RDD 的几种方式:

  • 1、通过集合创建

    1
    2
    3
    // 如果创建RDD时没有指定分区数,则rdd的分区数 = sc.defaultParallelism
    val rdd = sc.parallelize(1 to 100)
    rdd.getNumPartitions

    备注:简单的说RDD分区数等于cores总数

  • 2、通过textFile创建

    1
    2
    val rdd = sc.textFile("data/start0721.big.log")
    rdd.getNumPartitions

    如果没有指定分区数:

    • 本地文件。rdd的分区数 = max(本地文件分片数, sc.defaultMinPartitions)

    • HDFS文件。 rdd的分区数 = max(hdfs文件 block 数, sc.defaultMinPartitions)

    备注:

    • 本地文件分片数 = 本地文件大小 / 32M

    • 如果读取的是HDFS文件,同时指定的分区数 < hdfs文件的block数,指定的数不生效。

RDD分区器

以下RDD分别是否有分区器,是什么类型的分区器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
scala> val rdd1 = sc.textFile("/wcinput/wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /wcinput/wc.txt MapPartitionsRDD[23] at textFile at <console>:24

scala> rdd1.partitioner
res28: Option[org.apache.spark.Partitioner] = None

scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[24] at flatMap at <console>:25

scala> rdd2.partitioner
res29: Option[org.apache.spark.Partitioner] = None

scala> val rdd3 = rdd2.map((_, 1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[25] at map at <console>:25

scala> rdd3.partitioner
res30: Option[org.apache.spark.Partitioner] = None

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at <console>:25

scala> rdd4.partitioner
res31: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

scala> val rdd5 = rdd4.sortByKey()
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[29] at sortByKey at <console>:25

scala> rdd5.partitioner
res32: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@bdd2d498)

只有Key-Value类型的RDD才可能有分区器,Value类型的RDD分区器的值是None。

分区器的作用及分类:

在 PairRDD(key,value) 中,很多操作都是基于key的,系统会按照key对数据进行重组,如groupbykey;

数据重组需要规则,最常见的就是基于 Hash 的分区,此外还有一种复杂的基于抽样 Range 分区方法;


HashPartitioner:最简单、最常用,也是默认提供的分区器。对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用 余数+分区的个数,最后返回的值就是这个key所属的分区ID。该分区方法可以保证key相同的数据出现在同一个分区中。

用户可通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
scala> val rdd1 = sc.makeRDD(1 to 100).map((_, 1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[31] at map at <console>:24

scala> rdd1.getNumPartitions
res33: Int = 12

// 仅仅是将数据大致平均分成了若干份;rdd并没有分区器
scala> rdd1.glom.collect.foreach(x=>println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1))
ArrayBuffer((9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1))
ArrayBuffer((17,1), (18,1), (19,1), (20,1), (21,1), (22,1), (23,1), (24,1), (25,1))
ArrayBuffer((26,1), (27,1), (28,1), (29,1), (30,1), (31,1), (32,1), (33,1))
ArrayBuffer((34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1), (41,1))
ArrayBuffer((42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1))
ArrayBuffer((51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1))
ArrayBuffer((59,1), (60,1), (61,1), (62,1), (63,1), (64,1), (65,1), (66,1))
ArrayBuffer((67,1), (68,1), (69,1), (70,1), (71,1), (72,1), (73,1), (74,1), (75,1))
ArrayBuffer((76,1), (77,1), (78,1), (79,1), (80,1), (81,1), (82,1), (83,1))
ArrayBuffer((84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1), (91,1))
ArrayBuffer((92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1))

scala> rdd1.partitioner
res35: Option[org.apache.spark.Partitioner] = None

// 主动使用 HashPartitioner
scala> val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[33] at partitionBy at <console>:25

scala> rdd2.glom.collect.foreach(x=>println(x.toBuffer))
ArrayBuffer((20,1), (50,1), (70,1), (100,1), (30,1), (80,1), (10,1), (40,1), (60,1), (90,1))
ArrayBuffer((11,1), (41,1), (61,1), (91,1), (1,1), (31,1), (51,1), (81,1), (21,1), (71,1))
ArrayBuffer((22,1), (42,1), (72,1), (92,1), (2,1), (32,1), (52,1), (82,1), (12,1), (62,1))
ArrayBuffer((23,1), (43,1), (73,1), (93,1), (3,1), (33,1), (53,1), (83,1), (13,1), (63,1))
ArrayBuffer((14,1), (34,1), (64,1), (84,1), (4,1), (54,1), (24,1), (44,1), (74,1), (94,1))
ArrayBuffer((25,1), (45,1), (75,1), (95,1), (5,1), (55,1), (15,1), (35,1), (65,1), (85,1))
ArrayBuffer((46,1), (96,1), (6,1), (26,1), (56,1), (76,1), (16,1), (36,1), (66,1), (86,1))
ArrayBuffer((37,1), (87,1), (7,1), (27,1), (57,1), (77,1), (17,1), (47,1), (67,1), (97,1))
ArrayBuffer((18,1), (48,1), (68,1), (98,1), (8,1), (28,1), (58,1), (78,1), (38,1), (88,1))
ArrayBuffer((19,1), (49,1), (69,1), (99,1), (29,1), (79,1), (9,1), (39,1), (59,1), (89,1))

scala>rdd2.partitioner
res37: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)

// 主动使用 HashPartitioner
scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[37] at partitionBy at <console>:25

scala> rdd3.glom.collect.foreach(x=>println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1))
ArrayBuffer((17,1), (18,1), (19,1), (20,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1))
ArrayBuffer((26,1), (27,1), (28,1), (29,1), (30,1), (21,1), (22,1), (23,1), (24,1), (25,1))
ArrayBuffer((34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1), (31,1), (32,1), (33,1))
ArrayBuffer((42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1), (41,1))
ArrayBuffer((59,1), (60,1), (51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1))
ArrayBuffer((67,1), (68,1), (69,1), (70,1), (61,1), (62,1), (63,1), (64,1), (65,1), (66,1))
ArrayBuffer((76,1), (77,1), (78,1), (79,1), (80,1), (71,1), (72,1), (73,1), (74,1), (75,1))
ArrayBuffer((84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1), (81,1), (82,1), (83,1))
ArrayBuffer((92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1), (91,1))

scala> rdd3.partitioner
res39: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@7e392d9e)

Spark的很多算子都可以设置 HashPartitioner 的值:


RangePartitioner:简单的说就是将一定范围内的数映射到某一个分区内。在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。


现在的问题:在执行分区之前其实并不知道数据的分布情况,如果想知道数据分区就需要对数据进行采样

Spark中RangePartitioner在对数据采样的过程中使用了水塘采样算法

水塘采样:从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况;

在采样的过程中执行了collect()操作,引发了Action操作。

自定义分区器:Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。

实现自定义分区器按以下规则分区:

  • 分区0 < 100

  • 100 <= 分区1 < 200

  • 200 <= 分区2 < 300

  • 300 <= 分区3 < 400

  • … …

  • 900 <= 分区9 < 1000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable

class MyPartitioner(n: Int) extends Partitioner{
// 有多少个分区数
override def numPartitions: Int = n

// 给定key,如果去分区
override def getPartition(key: Any): Int = {
val k = key.toString.toInt
k / 100
}
}

object UserDefinedPartitioner {
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

// 业务逻辑
val random = scala.util.Random
val arr: immutable.IndexedSeq[Int] = (1 to 100).map(idx => random.nextInt(1000))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))
rdd1.glom.collect.foreach(x => println(x.toBuffer))

println("************************************************************************")
val rdd2 = rdd1.partitionBy(new MyPartitioner(11))
rdd2.glom.collect.foreach(x => println(x.toBuffer))

// 关闭SparkContext
sc.stop()
}
}