RDD编程之基本使用
RDD的创建
SparkContext
SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互;
如果把Spark集群当作服务端,那么Driver就是客户端,SparkContext 是客户端的核心;
SparkContext是Spark的对外接口,负责向调用者提供 Spark 的各种功能;
SparkContext用于连接Spark集群、创建RDD、累加器、广播变量;
在 spark-shell 中 SparkContext 已经创建好了,可直接使用;
编写Spark Driver程序第一件事就是:创建SparkContext;
建议:Standalone模式或本地模式学习RDD的各种算子;
不需要HA;不需要IDEA
从集合创建RDD
从集合中创建RDD,主要用于测试。Spark 提供了以下函数:parallelize、makeRDD、range
1 | scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) |
备注:rdd.collect 方法在生产环境中不要使用,会造成Driver OOM
从文件系统创建RDD
用 textFile() 方法来从文件系统中加载数据创建RDD。方法将文件的 URI 作为参数,这个URI可以是:
-
本地文件系统
- 使用本地文件系统要注意:该文件是不是在所有的节点存在(在Standalone模式下)
-
分布式文件系统HDFS的地址
-
Amazon S3的地址
1 | // 从本地文件系统加载数据 |
从RDD创建RDD
本质是将一个RDD转换为另一个RDD。详细信息参见以下Transformation
Transformation【重要】
RDD的操作算子分为两类:
-
Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是Lazy 的);
-
Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;
-
Transformation:返回一个新的RDD
-
Action:返回结果int、double、集合(不会返回新的RDD)要很准确区分Transformation、Action
每一次 Transformation 操作都会产生新的RDD,供给下一个“转换”使用;
转换得到的RDD是惰性求值的。也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到 Action 操作时,才会发生真正的计算,开始从血缘关系(lineage)源头开始,进行物理的转换操作;
常见的 Transformation 算子:
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
常见转换算子1
map(func):对数据集中的每个元素都使用func,然后返回一个新的RDD
filter(func):对数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):与 map 类似,每个输入元素被映射为0或多个输出元素
mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数仅被调用M次,一次处理一个分区中的所有元素
mapPartitionsWithIndex(func):与 mapPartitions 类似,多了分区索引值信息
全部都是窄依赖
1 | scala> val rdd1 = sc.parallelize(1 to 10,6) |
map 与 mapPartitions 的区别
-
map:每次处理一条数据
-
mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易导致OOM
-
最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率
常见转换算子2
groupBy(func):按照传入函数的返回值进行分组。将key相同的值放入一个迭代器
glom():将每一个分区形成一个数组,形成新的RDD类型 RDD[Array[T]]
sample(withReplacement, fraction, seed):采样算子。以指定的随机种子(seed)随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
distinct([numTasks])):对RDD元素去重后,返回一个新的RDD。可传入numTasks参数改变RDD分区数
coalesce(numPartitions):缩减分区数,无shuffle
repartition(numPartitions):增加或减少分区数,有shuffle
sortBy(func, [ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序
宽依赖的算子(shuffle):groupBy、distinct、repartition、sortBy
1 | // 将 RDD 中的元素按照3的余数分组 |
coalesce 与 repartition 的区别
小结:
-
repartition:增大或减少分区数;有shuffle
-
coalesce:一般用于减少分区数(此时无shuffle)
常见转换算子3
RDD之间的交、并、差算子,分别如下:
-
intersection(otherRDD)
-
union(otherRDD)
-
subtract (otherRDD)
cartesian(otherRDD):笛卡尔积
zip(otherRDD):将两个RDD组合成 key-value 形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
宽依赖的算子(shuffle):intersection、subtract
1 | scala> val rdd1 = sc.range(1, 21, 6) |
备注:
-
union是窄依赖。得到的RDD分区数为:两个RDD分区数之和
-
cartesian是窄依赖
-
得到RDD的元素个数为:两个RDD元素个数的乘积
-
得到RDD的分区数为:两个RDD分区数的乘积
-
使用该操作会导致数据膨胀,慎用
-
Action
Action 用来触发RDD的计算,得到相关计算结果;
1 | Action触发Job。一个Spark程序(Driver程序)包含了多少 Action 算子,那么 就有多少Job; |
collect() / collectAsMap()
stats / count / mean / stdev / max / min
reduce(func) / fold(func) / aggregate(func)
first():Return the first element in this RDD
take(n):Take the first num elements of the RDD
top(n):按照默认(降序)或者指定的排序规则,返回前num个元素。
takeSample(withReplacement, num, [seed]):返回采样的数据
foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
1 | // 返回统计信息。仅能作用 RDD[Double] 类型上调用 |