引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>

图的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package cn.lagou.streaming.graphx

import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

case class User(name: String, age: Int, inDegress: Int, outDegress: Int)

object GraphXExample1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")

// 定义顶点
val vertexArray: Array[(VertexId, (String, Int))] = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)

// 定义边
val edgeArray: Array[Edge[Int]] = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 6),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)

// 图的定义
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

// 属性操作(找出图中年龄 > 30 的顶点;属性 > 5 的边; 属性 > 5 的 triplets)
graph.vertices
.filter{case (_, (_, age)) => age > 30}
.foreach(println)

graph.edges
.filter(edge => edge.attr > 5)
.foreach(println)

graph.triplets
.filter { t => t.attr > 5 }
.foreach(println)

println("属性操作一----------------------------------------------------------------------------")

// 属性操作。degress操作,找出图中最大的出度、入度、度数
val inDegress: (VertexId, Int) = graph.inDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"inDegress = $inDegress")

val outDegress: (VertexId, Int) = graph.outDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"outDegress = $outDegress")

val degress: (VertexId, Int) = graph.degrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"degress = $degress")

println("属性操作二----------------------------------------------------------------------------")

// 转换操作。顶点转换,所有人年龄加 100
graph.mapVertices{case (id, (name, age)) => (id, (name, age+100))}
.vertices
.foreach(println)

// 边的转换,边的属性*2
graph.mapEdges(e => e.attr * 2)
.edges
.foreach(println)

println("转换操作----------------------------------------------------------------------------")

// 结构操作。顶点年龄 > 30 的子图
val subGraph: Graph[(String, Int), Int] = graph.subgraph(vpred = (id, vd) => vd._2 > 30)
subGraph.edges.foreach(println)
subGraph.vertices.foreach(println)

println("结构操作----------------------------------------------------------------------------")

// 连接操作,找出出度=入度的人员。
// 思路:图 + 顶点的出度 + 顶点的入度 => 连接操作
val initailUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) =>
User(name, age, 0, 0)
}

val userGraph: Graph[User, Int] = initailUserGraph.outerJoinVertices(initailUserGraph.inDegrees) {
case (id, u, inDeg) => User(u.name, u.age, inDeg.getOrElse(0), u.outDegress)
}.outerJoinVertices(initailUserGraph.outDegrees) {
case (id, u, outDeg) => User(u.name, u.age, u.inDegress, outDeg.getOrElse(0))
}
userGraph.vertices.filter{case (_, user) => user.inDegress==user.outDegress}
.foreach(println)

println("连接操作----------------------------------------------------------------------------")

// 顶点5到其他各顶点的最短距离。聚合操作(Pregel API)
val sourceId: VertexId = 5L
val initailGraph: Graph[Double, Int] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

val disGraph: Graph[Double, Int] = initailGraph.pregel(Double.PositiveInfinity)(
// 两个消息来的时候,取其中的最小路径
(id, dist, newDist) => math.min(dist, newDist),

// Send Message 函数
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else
Iterator.empty
},

// mergeMsg
(dista, distb) => math.min(dista, distb)
)

disGraph.vertices.foreach(println)

println("聚合操作----------------------------------------------------------------------------")

sc.stop()
}
}
  • Pregel API

    图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。

    所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。

    一系列的图并发抽象被提出来用来表达这些迭代算法。

    GraphX公开了一个类似Pregel的操作。


    • vprog:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值

    • sendMsg:发送消息

    • mergeMsg:合并消息

连通图算法


给定数据文件,找到存在的连通体


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.lagou.streaming.graphx

import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")

// 生成图
val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")

graph.vertices.foreach(println)
graph.edges.foreach(println)

// 调用连通图算法
graph.connectedComponents()
.vertices
.sortBy(_._2)
.foreach(println)

sc.stop()
}
}

寻找相同的用户,合并信息

假设:

  • 假设有五个不同信息可以作为用户标识,分别为:1X、2X、3X、4X、5X;

  • 每次可以选择使用若干为字段作为标识

  • 部分标识可能发生变化,如:12 => 13 或 24 => 25

根据以上规则,判断以下标识是否代表同一用户:

  • 11-21-32、12-22-33 (X)

  • 11-21-32、11-21-52 (OK)

  • 21-32、11-21-33 (OK)

  • 11-21-32、32-48 (OK)

问题:在以下数据中,找到同一用户,合并相同用户的数据

  • 对于用户标识(id):合并后去重

  • 对于用户的信息:key相同,合并权重

1
2
3
4
5
6
7
List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)
List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)
List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)

List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)
List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)
List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西 二旗" -> 1.0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package cn.lagou.streaming.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")

// 原始数据集
val lst: List[(List[Long], List[(String, Double)])] = List(
(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)),
(List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)),
(List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)),
(List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
(List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
)
val rawRDD: RDD[(List[Long], List[(String, Double)])] = sc.makeRDD(lst)

// 创建边。RDD[Edge(Long, Long, T2)]
// List(11L, 21L, 31L), A1 => 11 -> 112131, 21 -> 112131, 31 -> 112131
val dotRDD: RDD[(Long, Long)] = rawRDD.flatMap { case (ids, _) =>
ids.map(id => (id, ids.mkString.toLong))
}
val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) }

// 创建顶点。RDD[(Long, T1)]
val vertexesRDD: RDD[(Long, String)] = dotRDD.map { case (id, ids) => (id, "") }

// 生成图
val graph: Graph[String, Int] = Graph(vertexesRDD, edgesRDD)

// 调用强连通体算法。识别6条数据,代表2个不同的用户
val connectedRDD: VertexRDD[VertexId] = graph.connectedComponents()
.vertices
// connectedRDD.foreach(println)

// 定义中心的数据
val centerVertexRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] =
rawRDD.map { case (ids, info) => (ids.mkString.toLong, (ids, info)) }
// centerVertexRDD.foreach(println)

// join操作,拿到分组的标记
val dataRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = connectedRDD.join(centerVertexRDD)
.map { case (_, (v1, v2)) => (v1, v2) }

// 数据聚合、合并
val resultRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.reduceByKey { case ((bufIds, bufInfo), (ids, info)) =>
// 数据聚合
val newIds: List[VertexId] = bufIds ++ ids
val newInfo: List[(String, Double)] = bufInfo ++ info

// 对用户id做去重;对标签做合并
(newIds.distinct, newInfo.groupBy(_._1).mapValues(lst => lst.map(_._2).sum).toList)
}

resultRDD.foreach(println)

sc.stop()
}
}