idea创建maven项目,添加scala项目框架,创建scala源文件夹,pom文件添加依赖


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

WordCount - scala


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {
def main(args: Array[String]): Unit = {
// 1、创建SparkContext
// main方法运行
// val conf = new SparkConf().setMaster("local[*]").setAppName("ScalaWordCount")
// 打包使用
val conf = new SparkConf().setAppName("ScalaWordCount")
val sc = new SparkContext(conf)
// 设置日志的显示级别
sc.setLogLevel("WARN")

// 2、读本地文件(集群运行:输入参数)
// 本地文件 绝对路径
// val lines: RDD[String] = sc.textFile("file:///c:/data/scala/wc.txt")
val lines: RDD[String] = sc.textFile(args(0))

// 3、RDD转换
val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))
val wordsMap: RDD[(String, Int)] = words.map(x => (x, 1))
val result: RDD[(String, Int)] = wordsMap.reduceByKey(_ + _)

// 4、输出
result.foreach(println)

// 5、关闭SparkContext
sc.stop()
}
}

备注:打包上传服务器运行,使用spark-submit提交集群运行

1
2
3
4
5
6
# 进入上传的jar的目录,执行spark-submit命令,最后文件为hdfs目录
# local方式执行
spark-submit --master local[*] --class cn.lagou.sparkcore.WordCount original-sparkcoreDemo-1.0-SNAPSHOT.jar /wcinput/*

# yarn方式执行
spark-submit --master yarn --class cn.lagou.sparkcore.WordCount original-LagouBigData-1.0-SNAPSHOT.jar /wcinput/*

WordCount - java

Spark提供了:Scala、Java、Python、R语言的API;

对 Scala 和 Java 语言的支持最好;



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaWordCount {
public static void main(String[] args) {
// 1 创建 JavaSparkContext
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
jsc.setLogLevel("warn");

// 2 生成RDD
JavaRDD<String> lines = jsc.textFile("file:///C:\\Project\\LagouBigData\\data\\wc.txt" );

// 3 RDD转换
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
JavaPairRDD<String, Integer> wordsMap = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> results = wordsMap.reduceByKey((x, y) -> x + y);

// 4 结果输出
results.foreach(elem -> System.out.println(elem));

// 5 关闭SparkContext
jsc.stop();
}
}

备注:

  • Spark入口点:JavaSparkContext

  • Value-RDD:JavaRDD;key-value RDD:JavaPairRDD

  • JavaRDD 和 JavaPairRDD转换

    • JavaRDD => JavaPairRDD:通过mapToPair函数

    • JavaPairRDD => JavaRDD:通过map函数转换

  • lambda表达式使用 ->

计算圆周率


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.lagou.sparkcore

import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random

object SparkPi {
def main(args: Array[String]): Unit = {
// 1、创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

// 2、生成RDD;RDD转换
val slices = if (args.length > 0) args(0).toInt else 10
val N = 100000000
val count = sc.makeRDD(1 to N, slices).map(idx => {
val (x, y) = (random, random)
if (x*x + y*y <= 1) 1 else 0
}).reduce(_+_)

// 3、结果输出
println(s"Pi is roughly ${4.0 * count / N}")

// 4、关闭SparkContext
sc.stop()
}
}

广告数据统计

数据格式:timestamp province city userid adid 时间点 省份 城市 用户 广告

需求: 1、统计每一个省份点击TOP3的广告ID 2、统计每一个省份每一个小时的TOP3广告ID


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AdStat {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val N = 3

// 读文件
val lines: RDD[String] = sc.textFile("file:///C:\\Users\\Administrator\\IdeaProjects\\sparkcoreDemo\\data\\advert.log")
// 字段:时间、省份、城市、用户、广告
val rawRDD: RDD[(String, String, String)] = lines.map(line => {
val arr: Array[String] = line.split("\\s+")
(arr(0), arr(1), arr(4))
})

//rawRDD.foreach(println)

println("--------------------------------------------------------------")

// 需求1:统计每个省份点击 TOP3 的广告ID
rawRDD.map {
case (_, province, adid) => ((province, adid), 1)
}.reduceByKey(_ + _).map {
case ((province, adid), count) => (province, (adid, count))
}.groupByKey().mapValues(_.toList.sortWith(_._2 > _._2).take(N)).foreach(println)

println("--------------------------------------------------------------")

// 需求2:统计每个省份每小时 TOP3 的广告ID
rawRDD.map {
case (time, province, adid) => ((getHour(time), province, adid), 1)
}.reduceByKey(_ + _).map {
case ((hour, province, adid), count) => ((hour, province), (adid, count))
}.groupByKey().mapValues(_.toList.sortWith(_._2 > _._2).take(N)).foreach(println)

sc.stop()
}

def getHour(timelong: String): String = {
import org.joda.time.DateTime
val datetime = new DateTime(timelong.toLong)
datetime.getHourOfDay.toString
}
}

在Java 8出现前的很长时间内成为Java中日期时间处理的事实标准,用来弥补JDK的不足。

Joda 类具有不可变性,它们的实例无法被修改。(不可变类的一个优点就是它们是线程安全的)

在 Spark Core 程序中使用时间日期类型时,不要使用 Java 8 以前的时间日期类型,线程不安全。


找共同好友


第一列表示用户,后面的表示该用户的好友

要求:1、查找两两用户的共同好友 2、最后的结果按前两个id号有序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends {
def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile("file:///C:\\Users\\Administrator\\IdeaProjects\\sparkcoreDemo\\data\\fields.dat")

// 方法一:核心思想利用笛卡尔积求两两的好友,然后去除多余的数据
val friendsRDD: RDD[(String, Array[String])] = lines.map { line =>
val fields: Array[String] = line.split(",")
val userId = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
}

//friendsRDD.collect().foreach(println)

println("*****************************************************************")

friendsRDD.cartesian(friendsRDD)
.filter { case ((id1, _), (id2, _)) => id1 < id2 }
.map{case ((id1, friends1), (id2, friends2)) =>
//((id1, id2), friends1.toSet & friends2.toSet)
((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
}.sortByKey()
.collect().foreach(println)

// 方法二:消除笛卡尔积,更高效。
// 核心思想:将数据变形,找到两两的好友, 再执行数据的合并
println("*****************************************************************")
// combinations遍历此数组的可能 n-element 组合的迭代器
// 如 Array("a", "b", "b", "b", "c").combinations(2) == Iterator(Array(a, b), Array(a, c), Array(b, b), Array(b, c))
friendsRDD.flatMapValues(friends => friends.combinations(2))
// .map(x => (x._2.mkString(" & "), Set(x._1)))
.map{case (k, v) => (v.mkString(" & "), Set(k))}
.reduceByKey(_ | _)
.sortByKey()
.collect().foreach(println)

// 备注:flatMapValues / combinations / 数据的变形 / reduceByKey / 集合的操作

// 关闭SparkContext

sc.stop()
}
}

Super WordCount

要求:将单词全部转换为小写,去除标点符号(难),去除停用词(难);最后按照 count 值降序保存到文件,同时将全部结果保存到MySQL(难);标点符号和停用词可以自定义。

停用词:语言中包含很多功能词。与其他词相比,功能词没有什么实际含义。最普遍的功能词是[限定词](the、a、an、that、those),介词(on、in、to、from、over等)、代词、数量词等。

Array[(String, Int)] => scala jdbc => MySQL


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {
private val stopWords = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"

def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("file:///C:\\Users\\Administrator\\IdeaProjects\\sparkcoreDemo\\data\\swc.dat")
lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length>0)
.map((_, 1))
.reduceByKey(_+_)
.sortBy(_._2, false)
.collect.foreach(println)

// 关闭SparkContext
sc.stop()
}
}

引入依赖

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}

object JDBCDemo {
def main(args: Array[String]): Unit = {
// 定义结果集Array[(String, Int)]
val str = "hadoop spark java scala hbase hive sqoop hue tez atlas datax grinffin zk kafka"
val result: Array[(String, Int)] = str.split("\\s+").zipWithIndex

// 定义参数
val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
val username = "hive"
val password = "12345678"

// jdbc 保存数据
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"
try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
result.foreach{case (k, v) =>
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}
}catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
}

MySQL数据库建表

1
create table wordcount(word varchar(30), count int);

未优化的程序:使用 foreach 保存数据,要创建大量的链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount2 {
private val stopWords = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val username = "hive"
private val password = "12345678"

def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("file:///C:\\Project\\LagouBigData\\data\\swc.dat")
val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length > 0)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)

// 结果输出
resultRDD.saveAsTextFile("file:///C:\\Project\\LagouBigData\\data\\superwc")
// 使用 foreach,对每条记录创建连接
resultRDD.foreach{case (k, v) =>
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"
try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}

// 关闭SparkContext
sc.stop()
}
}

优化后的程序:使用 foreachPartition 保存数据,一个分区创建一个链接;cache RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount3 {
private val stopWords: Array[String] = "in on to from by a an the is are were was i we you your he his some any of as can it each".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
private val username = "hive"
private val password = "12345678"

def main(args: Array[String]): Unit = {
// 创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

// RDD转换
// 换为小写,去除标点符号(难),去除停用词(难)
val lines: RDD[String] = sc.textFile("file:///C:\\Project\\LagouBigData\\data\\swc.dat")
val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
.map(_.toLowerCase)
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWords.contains(word) && word.trim.length > 0)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)

//resultRDD.foreach(println)
resultRDD.cache()

// 结果输出
resultRDD.saveAsTextFile("file:///C:\\Project\\LagouBigData\\data\\superwc")
// 使用 foreachPartition,对每条记录创建连接
resultRDD.foreachPartition { iter => saveAsMySQL(iter)}

// 关闭SparkContext
sc.stop()
}

def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values (?, ?)"

try {
conn = DriverManager.getConnection(url, username, password)
stmt = conn.prepareStatement(sql)
iter.foreach { case (k, v) =>
stmt.setString(1, k)
stmt.setInt(2, v)
stmt.executeUpdate()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
}
// 在SparkSQL中有内建的访问MySQL的方法,调用非常方便
// SparkCore、SQL不支持的外部存储