基础数据源包括:文件数据流、socket数据流、RDD队列流;这些数据源主要用于测试。


创建maven项目,pom文件中添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.9.2</hadoop.version>
<encoding>UTF-8</encoding>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.7</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

文件数据流

文件数据流:通过 textFileStream(directory) 方法进行读取 HDFS 兼容的文件系统文件


Spark Streaming 将会监控 directory 目录,并不断处理移动进来的文件

  • 不支持嵌套目录

  • 文件需要有相同的数据格式

  • 文件进入 directory 的方式需要通过移动或者重命名来实现

  • 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据

  • 文件流不需要接收器(receiver),不需要单独分配CPU核

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.lagou.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName(this.getClass.getCanonicalName)
.setMaster( "local[*]")

// 创建StreamingContext
// StreamingContext是所有流功能函数的主要访问点,这里使用多个执行线程和 2秒的批次间隔来创建本地的StreamingContext
// 时间间隔为2秒,即2秒一个批次
val ssc = new StreamingContext(conf, Seconds(5))

// 这里采用本地文件,也可以采用HDFS文件
val lines = ssc.textFileStream("data/log/")
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

// 打印单位时间所获得的计数值
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

Socket数据流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SocketDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")

// 创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(1))
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("linux122", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordCounts: DStream[(String, Int)] = words.map(x => (x.trim, 1)).reduceByKey(_ + _)

// 打印单位时间所获得的计数值
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

注意:DStream的 StorageLevel 是 MEMORY_AND_DISK_SER_2;

在linux122虚拟机新开一个命令窗口,启动 nc 程序:

1
2
3
nc -lk 9999

# yum install nc

启动 nc 程序后,启动java程序,然后 nc 程序输入单词,如下:


java程序的输出会自动获得单词数据流信息,在屏幕上出现结果


SocketServer程序(单线程),监听本机指定端口,与socket连接后可发送信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.lagou.streaming.basic

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
import scala.util.Random

object SocketLikeNC {
def main(args: Array[String]): Unit = {
val words: Array[String] = "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop".split("\\s+")
val n: Int = words.length
val port: Int = 9999

val random: Random = scala.util.Random
val server = new ServerSocket(port)
val socket: Socket = server.accept()
println("成功连接到本地主机:" + socket.getInetAddress)

while (true) {
val out = new PrintWriter(socket.getOutputStream)
out.println(words(random.nextInt(n)) + " "+ words(random.nextInt(n)))
out.flush()
Thread.sleep(100)
}
}
}

此时,SocketLikeNC作为发送者,接受者为SocketDStream,并且修改SocketDStream中的主机IP地址为localhost,使用local[*],可能存在问题。

如果给虚拟机配置的cpu数为1,使用local[*]也只会启动一个线程,该线程用于 receiver task,此时没有资源处理接收达到的数据。


SocketServer程序(多线程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.lagou.streaming.basic

import java.io.DataOutputStream
import java.net.Socket
import java.net.ServerSocket

class ServerThread(sock: Socket) extends Thread {
val words = "hello world hello spark hello word hello java hello hadoop hello kafka".split("\\s+")
val length = words.length

override def run(): Unit = {
val out = new DataOutputStream(sock.getOutputStream)
val random = scala.util.Random
while (true) {
val (wordx, wordy) = (words(random.nextInt(length)), words(random.nextInt(length)))
out.writeUTF(s"$wordx $wordy")
Thread.sleep(100)
}
}
}

object SocketServer {
def main(args: Array[String]): Unit = {
val server = new ServerSocket(9999)
println(s"Socket Server 已启动: ${server.getInetAddress}:${server.getLocalPort}")

while (true) {
val socket = server.accept()
println("成功连接到本地主机:" + socket.getInetAddress)
new ServerThread(socket).start()
}
}
}

RDD队列流

调试Spark Streaming应用程序的时候,可使用streamingContext.queueStream(queueOfRDD) 创建基于RDD队列的DStream;


备注:

  • oneAtATime:缺省为true,一次处理一个RDD;设为false,一次处理全部RDD

  • RDD队列流可以使用local[1]

  • 涉及到同时出队和入队操作,所以要做同步

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,计算RDD中数据除10取余的个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.lagou.streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.Queue

object RDDQueueDStream {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster( "local[2]")

// 每隔1秒对数据进行处理
val ssc = new StreamingContext(sparkConf, Seconds(1))
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()

// 每秒产生一个RDD
for (i <- 1 to 5){
rddQueue.synchronized {
val range = (1 to 100).map(_*i)
rddQueue += ssc.sparkContext.makeRDD(range, 2)
}
Thread.sleep(2000)
}

ssc.stop()
}
}