Spark-Streaming概述
随着大数据技术的不断发展,人们对于大数据的实时性处理要求也在不断提高,传统的 MapReduce 等批处理框架在某些特定领域,例如实时用户推荐、用户行为分析这些应用场景上逐渐不能满足人们对实时性的需求,因此诞生了一批如 S3、Samza、Storm、Flink等流式分析、实时计算框架。
Spark 由于其内部优秀的调度机制、快速的分布式计算能力,能够以极快的速度进行迭代计算。正是由于具有这样的优势,Spark 能够在某些程度上进行实时处理,Spark Streaming 正是构建在此之上的流式框架。
什么是Spark Streaming
Spark Streaming类似于Apache S ...
Spark-SQL原理
SparkSQL中的join
数据分析中将两个数据集进行 Join 操作是很常见的场景。在 Spark 的物理计划阶段,Spark 的 Join Selection 类会根据 Join hints 策略、Join 表的大小、 Join 是等值Join 还是不等值以及参与 Join 的 key 是否可以排序等条件来选择最终的 Join 策略,最后 Spark 会利用选择好的 Join 策略执行最终的计算。当前 Spark 一共支持五种 Join 策略:
Broadcast hash join (BHJ)
Shuffle hash join(SHJ)
Shuffle sort merg ...
Spark-SQL编程之UDF和UDAF以及访问Hive
UDF & UDAF
UDF
UDF(User Defined Function),自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数。实现上看就是普通的Scala函数;
用Scala编写的UDF与普通的Scala函数几乎没有任何区别,唯一需要多执行的一个步骤是要在SQLContext注册它。
123def len(bookTitle: String):Int = bookTitle.length spark.udf.register("len", len _)val booksWithLongTitle = spar ...
Spark-SQL编程之SQL语句与输入输出
SQL语句
总体而言:SparkSQL与HQL兼容;与HQL相比,SparkSQL更简洁。
createTempView、createOrReplaceTempView、spark.sql(“SQL”)
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647package cn.lagou.sparksqlimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{Dataset, SparkS ...
Spark-SQL编程之Action操作与Transformation操作
Action操作
与RDD类似的操作
show、collect、collectAsList、head、first、count、take、takeAsList、reduce
与结构相关
printSchema、explain、columns、dtypes、col
123456789101112131415EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,207499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300 ...
Spark-SQL编程之DataFrame和Dataset的创建
官方文档:http://spark.apache.org/docs/latest/sql-getting-started.html
SparkSession
在 Spark 2.0 之前:
SQLContext 是创建 DataFrame 和执行 SQL 的入口
HiveContext通过Hive sql语句操作Hive数据,兼Hhive操作,HiveContext继承自SQLContext
在 Spark 2.0 之后:
将这些入口点统一到了SparkSession,SparkSession 封装了 SqlContext 及HiveContext;
实现了 ...
Spark-SQL概述
Hive的诞生,主要是因为开发MapReduce程序对 Java 要求比较高,为了让他们能够操作HDFS上的数据,推出了Hive。Hive与RDBMS的SQL模型比较类似,容易掌握。Hive的主要缺陷在于它的底层是基于MapReduce的,执行比较慢。
在Spark 0.x版的时候推出了Shark,Shark与Hive是紧密关联的,Shark底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,性能上比Hive提升了很多倍。
Shark更多是对Hive的改造,替换了Hive的物理执行引擎,提高了执行速度。但Shark继承了大量的Hive ...
Spark原理初探
Standalone模式作业提交
Standalone 模式下有四个重要组成部分,分别是:
Driver:用户编写的 Spark 应用程序就运行在 Driver 上,由Driver 进程执行
Master:主要负责资源的调度和分配,并进行集群的监控等职责
Worker:Worker 运行在集群中的一台服务器上。负责管理该节点上的资源,负责启动启动节点上的 Executor
Executor:一个 Worker 上可以运行多个 Executor,Executor通过启动多个线程(task)对 RDD 的分区进行并行计算
SparkContext 中的三大组件:
DAGSc ...
RDD编程之广播变量和累加器以及TopN的优化
有时候需要在多个任务之间共享变量,或者在任务(Task)和Driver Program之间共享变量。
为了满足这种需求,Spark提供了两种类型的变量:
广播变量(broadcast variables)
累加器(accumulators)
广播变量、累加器主要作用是为了优化Spark程序。
广播变量
广播变量将变量在节点的 Executor 之间进行共享(由Driver广播出去);
广播变量用来高效分发较大的对象。向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。
使用广播变量的过程如下:
对一个类型 T 的对象调用 SparkContext.b ...
RDD编程之分区与分区器
RDD的分区
spark.default.parallelism:(默认的并发数)= 2
当配置文件spark-default.conf中没有显示的配置,则按照如下规则取值:
1、本地模式
12spark-shell --master local[N] spark.default.parallelism = Nspark-shell --master local spark.default.parallelism = 1
2、伪分布式(x为本机上启动的executor数,y为每个executor使用的core数,z为每个 executor使用的内存)
1spark-shell --ma ...