RDD的创建

SparkContext

SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互;

如果把Spark集群当作服务端,那么Driver就是客户端,SparkContext 是客户端的核心;

SparkContext是Spark的对外接口,负责向调用者提供 Spark 的各种功能;

SparkContext用于连接Spark集群、创建RDD、累加器、广播变量;

在 spark-shell 中 SparkContext 已经创建好了,可直接使用;


编写Spark Driver程序第一件事就是:创建SparkContext;

建议:Standalone模式或本地模式学习RDD的各种算子

不需要HA;不需要IDEA

从集合创建RDD

从集合中创建RDD,主要用于测试。Spark 提供了以下函数:parallelize、makeRDD、range


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(1 to 100)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd3 = sc.makeRDD(List(1,2,3,4,5))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24


scala> val rdd4 = sc.makeRDD(1 to 100)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> val rdd5 = sc.range(1, 100, 3)
rdd5: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at range at <console>:24

# 指定分区数10
scala> val rdd6 = sc.range(1, 100, 2 ,10)
rdd6: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[7] at range at <console>:24

scala> val rdd7 = sc.range(1, 100, numSlices=10)
rdd6: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[9] at range at <console>:24

// 检查 RDD 分区数
scala> rdd1.getNumPartitions
res1: Int = 1

scala> rdd2.getNumPartitions
res2: Int = 1

scala> rdd3.getNumPartitions
res3: Int = 1

scala> rdd4.getNumPartitions
res1: Int = 1

scala> rdd5.getNumPartitions
res1: Int = 1

scala> rdd6.getNumPartitions
res1: Int = 10

scala> rdd7.getNumPartitions
res9: Int = 10

scala> rdd5.collect
res8: Array[Long] = Array(1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 40, 43, 46, 49, 52, 55, 58, 61, 64, 67, 70, 73, 76, 79, 82, 85, 88, 91, 94, 97)

// 检查 RDD 分区数
rdd1.partitions.length

备注:rdd.collect 方法在生产环境中不要使用,会造成Driver OOM

从文件系统创建RDD

用 textFile() 方法来从文件系统中加载数据创建RDD。方法将文件的 URI 作为参数,这个URI可以是:

  • 本地文件系统

    • 使用本地文件系统要注意:该文件是不是在所有的节点存在(在Standalone模式下)
  • 分布式文件系统HDFS的地址

  • Amazon S3的地址

1
2
3
4
5
6
7
8
9
// 从本地文件系统加载数据
val lines = sc.textFile("file:///root/data/wc.txt")

// 从分布式文件系统加载数据
// 未配置core-site.xml
val lines = sc.textFile("hdfs://linux121:9000/user/root/data/uaction.dat")
// 已配置core-site.xml
val lines = sc.textFile("/user/root/data/uaction.dat")
val lines = sc.textFile("data/uaction.dat")

从RDD创建RDD

本质是将一个RDD转换为另一个RDD。详细信息参见以下Transformation

Transformation【重要】

RDD的操作算子分为两类:

  • Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是Lazy 的);

  • Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;

  • Transformation:返回一个新的RDD

  • Action:返回结果int、double、集合(不会返回新的RDD)要很准确区分Transformation、Action

每一次 Transformation 操作都会产生新的RDD,供给下一个“转换”使用;

转换得到的RDD是惰性求值的。也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到 Action 操作时,才会发生真正的计算,开始从血缘关系(lineage)源头开始,进行物理的转换操作;


常见的 Transformation 算子:

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

常见转换算子1

map(func):对数据集中的每个元素都使用func,然后返回一个新的RDD

filter(func):对数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

flatMap(func):与 map 类似,每个输入元素被映射为0或多个输出元素

mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数仅被调用M次,一次处理一个分区中的所有元素

mapPartitionsWithIndex(func):与 mapPartitions 类似,多了分区索引值信息

全部都是窄依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
scala> val rdd1 = sc.parallelize(1 to 10,6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.getNumPartitions
res0: Int = 6

scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> rdd2.getNumPartitions
res1: Int = 6

scala> val rdd3 = rdd2.filter(_>10)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:25

scala> rdd3.getNumPartitions
res2: Int = 6

// 以上都是 Transformation 操作,没有被执行。如何证明这些操作按预期执 行,此时需要引入Action算子
scala> rdd1.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd2.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala> rdd3.collect
res5: Array[Int] = Array(12, 14, 16, 18, 20)
// collect 是Action算子,触发Job的执行,将RDD的全部元素从 Executor 搜集到 Driver 端。生产环境中禁用

// flatMap 使用案例
val rdd4 = sc.textFile("data/wc.txt")
rdd4.collect
rdd4.flatMap(_.split("\\s+")).collect

scala> rdd1.mapPartitions{iter => Iterator(s"${iter.toList}") }.collect
res6: Array[String] = Array(List(1), List(2, 3), List(4, 5), List(6), List(7, 8), List(9, 10))

scala> rdd1.mapPartitions{iter => Iterator(s"${iter.toArray.mkString("-")}") }.collect
res7: Array[String] = Array(1, 2-3, 4-5, 6, 7-8, 9-10)

scala> rdd1.mapPartitionsWithIndex{(idx, iter) => Iterator(s"$idx:${iter.toArray.mkString("-")}") }.collect
res0: Array[String] = Array(0:1, 1:2-3, 2:4-5, 3:6, 4:7-8, 5:9-10)

// 每个元素 * 2
scala> val rdd5 = rdd1.mapPartitions(iter => iter.map(_*2))
rdd5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at mapPartitions at <console>:25

scala> rdd5.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

map 与 mapPartitions 的区别

  • map:每次处理一条数据

  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易导致OOM

  • 最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率

常见转换算子2

groupBy(func):按照传入函数的返回值进行分组。将key相同的值放入一个迭代器

glom():将每一个分区形成一个数组,形成新的RDD类型 RDD[Array[T]]

sample(withReplacement, fraction, seed):采样算子。以指定的随机种子(seed)随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

distinct([numTasks])):对RDD元素去重后,返回一个新的RDD。可传入numTasks参数改变RDD分区数

coalesce(numPartitions):缩减分区数,无shuffle

repartition(numPartitions):增加或减少分区数,有shuffle

sortBy(func, [ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序

宽依赖的算子(shuffle):groupBy、distinct、repartition、sortBy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// 将 RDD 中的元素按照3的余数分组
scala> val rdd = sc.parallelize(1 to 10,6)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val group = rdd.groupBy(_%3)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:25

scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 7, 4, 10)), (2,CompactBuffer(5, 2, 8)))

// 将 RDD 中的元素每10个元素分组
scala> val rdd1 = sc.parallelize(1 to 101,6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.glom.map(_.sliding(10, 10).toArray)
res1: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[5] at map at <console>:26

scala> rdd1.glom.map(_.sliding(10, 10).toArray).collect
res2: Array[Array[Array[Int]]] = Array(Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16)), Array(Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26), Array(27, 28, 29, 30, 31, 32, 33)), Array(Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43), Array(44, 45, 46, 47, 48, 49, 50)), Array(Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67)), Array(Array(68, 69, 70, 71, 72, 73, 74, 75, 76, 77), Array(78, 79, 80, 81, 82, 83, 84)), Array(Array(85, 86, 87, 88, 89, 90, 91, 92, 93, 94), Array(95, 96, 97, 98, 99, 100, 101)))
// sliding是Scala中的方法

// 对数据采样。fraction采样的百分比,近似数
// 有放回的采样,使用固定的种子
scala> rdd.sample(true, 0.2, 2).collect
res3: Array[Int] = Array(6, 8, 8, 10)
// 无放回的采样,使用固定的种子
scala> rdd.sample(false, 0.2, 2).collect
res4: Array[Int] = Array(1, 6, 8, 10)
// 有放回的采样,不设置种子
scala> rdd.sample(false, 0.2).collect
res5: Array[Int] = Array(2, 5, 10)

// 数据去重
scala> val random = scala.util.Random
random: util.Random.type = scala.util.Random$@7a248528

scala> val arr = (1 to 20).map(x => random.nextInt(10))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(9, 3, 4, 5, 1, 8, 9, 4, 3, 1, 0, 9, 7, 6, 9, 1, 1, 7, 8, 6)

scala> val rdd3 = sc.makeRDD(arr)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:26

scala> rdd3.distinct.collect
res6: Array[Int] = Array(0, 1, 3, 4, 5, 6, 7, 8, 9)

// RDD重分区
scala> val rdd4 = sc.range(1, 10000, numSlices=10)
rdd4: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[16] at range at <console>:24

scala> val rdd5 = rdd4.filter(_%2==0)
rdd5: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[17] at filter at <console>:25

scala> rdd5.getNumPartitions
res7: Int = 10

// 减少分区数;都生效了
scala> val rdd6 = rdd5.repartition(5)
rdd6: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[21] at repartition at <console>:25

scala> rdd6.getNumPartitions
res8: Int = 5

scala> val rdd7 = rdd5.coalesce(5)
rdd7: org.apache.spark.rdd.RDD[Long] = CoalescedRDD[22] at coalesce at <console>:25

scala> rdd7.getNumPartitions
res9: Int = 5

// 增加分区数
scala> val rdd8 = rdd5.repartition(20)
rdd8: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[26] at repartition at <console>:25

scala> rdd8.getNumPartitions
res10: Int = 20

// 增加分区数,这样使用没有效果
scala> val rdd9 = rdd5.coalesce(20)
rdd9: org.apache.spark.rdd.RDD[Long] = CoalescedRDD[27] at coalesce at <console>:25

scala> rdd9.getNumPartitions
res11: Int = 10

// 增加分区数的正确用法
scala> val rdd10 = rdd5.coalesce(20, true)
rdd10: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[31] at coalesce at <console>:25

scala> rdd10.getNumPartitions
res12: Int = 20

// RDD元素排序
scala> val random = scala.util.Random
random: util.Random.type = scala.util.Random$@7a248528

scala> val arr = (1 to 20).map(x => random.nextInt(10))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(9, 4, 7, 8, 2, 3, 1, 3, 7, 3, 9, 6, 1, 8, 1, 4, 5, 6, 4, 0)

scala> val rdd = sc.makeRDD(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:26

scala> rdd.collect
res13: Array[Int] = Array(9, 4, 7, 8, 2, 3, 1, 3, 7, 3, 9, 6, 1, 8, 1, 4, 5, 6, 4, 0)

// 数据全局有序,默认升序
scala> rdd.sortBy(x=>x).collect
res14: Array[Int] = Array(0, 1, 1, 1, 2, 3, 3, 3, 4, 4, 4, 5, 6, 6, 7, 7, 8, 8, 9, 9)

// 降序
scala> rdd.sortBy(x=>x,false).collect
res15: Array[Int] = Array(9, 9, 8, 8, 7, 7, 6, 6, 5, 4, 4, 4, 3, 3, 3, 2, 1, 1, 1, 0)

coalesce 与 repartition 的区别


小结:

  • repartition:增大或减少分区数;有shuffle

  • coalesce:一般用于减少分区数(此时无shuffle)

常见转换算子3

RDD之间的交、并、差算子,分别如下:

  • intersection(otherRDD)

  • union(otherRDD)

  • subtract (otherRDD)

cartesian(otherRDD):笛卡尔积

zip(otherRDD):将两个RDD组合成 key-value 形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

宽依赖的算子(shuffle):intersection、subtract

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
scala> val rdd1 = sc.range(1, 21, 6)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24

scala> rdd1.getNumPartitions
res0: Int = 12

scala> val rdd2 = sc.range(10, 31, 6)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24

scala> rdd1.getNumPartitions
res3: Int = 12

scala> rdd1.intersection(rdd2).sortBy(x=>x).collect
res0: Array[Long] = Array()

// 元素求并集,不去重
scala> rdd1.union(rdd2).sortBy(x=>x).collect
res4: Array[Long] = Array(1, 7, 10, 13, 16, 19, 22, 28)

scala> rdd1.subtract(rdd2).sortBy(x=>x).collect
res5: Array[Long] = Array(1, 7, 13, 19)

// 检查分区数
scala> rdd1.intersection(rdd2).getNumPartitions
res6: Int = 12

scala> rdd1.union(rdd2).getNumPartitions
res7: Int = 24

scala> rdd1.subtract(rdd2).getNumPartitions
res8: Int = 12

// 笛卡尔积
scala> val rdd1 = sc.range(1, 5)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[35] at range at <console>:24

scala> val rdd2 = sc.range(6, 10)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[37] at range at <console>:24

scala> rdd1.cartesian(rdd2).collect
res11: Array[(Long, Long)] = Array((1,6), (1,7), (1,8), (1,9), (2,6), (2,7), (2,8), (2,9), (3,6), (3,7), (3,8), (3,9), (4,6), (4,7), (4,8), (4,9))

// 检查分区数
scala> rdd1.cartesian(rdd2).getNumPartitions
res12: Int = 144

// 拉链操作
scala> rdd1.zip(rdd2).collect
res13: Array[(Long, Long)] = Array((1,6), (2,7), (3,8), (4,9))

scala> rdd1.zip(rdd2).getNumPartitions
res14: Int = 12

// zip操作要求:两个RDD的partition数量以及元素数量都相同,否则会抛出异常
scala> val rdd2 = sc.range(6, 20)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[43] at range at <console>:24

scala> rdd1.zip(rdd2).collect
22/08/26 02:02:23 WARN scheduler.TaskSetManager: Lost task 11.0 in stage 16.0 (TID 323, 192.168.91.122, executor 1): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

备注:

  • union是窄依赖。得到的RDD分区数为:两个RDD分区数之和

  • cartesian是窄依赖

    • 得到RDD的元素个数为:两个RDD元素个数的乘积

    • 得到RDD的分区数为:两个RDD分区数的乘积

    • 使用该操作会导致数据膨胀,慎用

Action

Action 用来触发RDD的计算,得到相关计算结果;

1
2
3
4
5
6
7
Action触发Job。一个Spark程序(Driver程序)包含了多少 Action 算子,那么 就有多少Job;

典型的Action算子: collect / count

collect() => sc.runJob() => ... => dagScheduler.runJob() => 触发了Job

要求:能快速准确的区分:Transformation、Action

collect() / collectAsMap()

stats / count / mean / stdev / max / min

reduce(func) / fold(func) / aggregate(func)


first():Return the first element in this RDD

take(n):Take the first num elements of the RDD

top(n):按照默认(降序)或者指定的排序规则,返回前num个元素。

takeSample(withReplacement, num, [seed]):返回采样的数据

foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action

saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 返回统计信息。仅能作用 RDD[Double] 类型上调用
scala> val rdd1 = sc.range(1, 101)
rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24

scala> rdd1.stats
res0: org.apache.spark.util.StatCounter = (count: 100, mean: 50.500000, stdev: 28.866070, max: 100.000000, min: 1.000000)

scala> val rdd2 = sc.range(1, 101)
rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at range at <console>:24

// 不能调用
scala> rdd1.zip(rdd2).stats
<console>:28: error: value stats is not a member of org.apache.spark.rdd.RDD[(Long, Long)]
rdd1.zip(rdd2).stats

// count在各种类型的RDD上,均能调用
scala> rdd1.zip(rdd2).count
res2: Long = 100

// 聚合操作
scala> val rdd = sc.makeRDD(1 to 10, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24

scala> rdd.reduce(_+_)
res3: Int = 55

scala> rdd.fold(0)(_+_)
res4: Int = 55

scala> rdd.fold(1)(_+_)
res5: Int = 58

rdd.fold(1)((x, y) => {
println(s"x=$x, y=$y")
x+y
})
x=1, y=41
x=42, y=16
res7: Int = 58

scala> rdd.aggregate(0)(_+_, _+_)
res8: Int = 55

scala> rdd.aggregate(1)(_+_, _+_)
res9: Int = 58

rdd.aggregate(1)(
(a, b) => {
println(s"a=$a, b=$b")
a+b
},
(x, y) => {
println(s"x=$x, y=$y")
x+y
})
x=1, y=41
x=42, y=16
res12: Int = 58

// first / take(n) / top(n) :获取RDD中的元素。多用于测试
// 第一个元素
scala> rdd.first
res14: Int = 1

// 前十元素
scala> rdd.take(10)
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// 降序后的前十元素
scala> rdd.top(10)
res16: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)

// 采样并返回结果
scala> rdd.takeSample(false, 5)
res17: Array[Int] = Array(5, 8, 2, 3, 1)

// 保存文件到指定路径(rdd有多少分区,就保存为多少文件,保存文件时注意小文 件问题)
scala> rdd.saveAsTextFile("data/t1")