【大数据分析】基于Spark Graphx的AllPairNodeConnectivity算法实现
localnodeconnectivity是基于 Spark Graphx 中的 Pregel 机制实现的算法。关于Pregel机制的理解可参考【大数据分析】基于Graphx的shortestpath源码解析。localnodeconnectivity可以在有限的迭代次数内尽可能多地计算出起始点SSS和目标点TTT之间的独立路径。所谓独立路径,指的是它与其他路径除了SSS和TTT之外,没有重合的点
算法介绍
AllPairNodeConnectivity是基于 Spark Graphx 中的 Pregel 机制实现的算法。关于Pregel机制的理解可参考【大数据分析】基于Graphx的shortestpath源码解析。AllPairNodeConnectivity可以在有限的迭代次数内尽可能多地计算出图的所有节点(作为起始点) S S S 到目标点集 T T T 之间的独立路径。所谓独立路径,指的是它与其他路径除了 S S S 和 T T T 之外,没有重合的点。
算法解析
数据的准备
样例数据可以参考已有的文章【大数据分析】基于Spark Graphx的图路径规划算法PathPlanning实现
关于图算法需要思考的问题
基于Pregel机制实现的算法,一般需要考虑几个问题
(1)如何定义节点的属性结构?
(2)如何初始化节点的属性?
(3)在进行第一次迭代前,如何激活所有的节点?
(4)如何传递消息(节点状态如何变化,消息传递的方向,如何进行消息的更新)
(5)接收到多个消息如何将它们进行组合(merge)
(6)最终接收到的消息如何与当前节点的属性组合(vertex_program)
定义节点的属性类型
type MsgType = Map[VertexId, Set[List[VertexId]]]
(1)VertexId 代表目标点集。
(2)Set[List[VertexId]] 代表从 S S S 到 T T T 的独立路径集合
节点属性的初始化
val APNCGraph = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid)) {
val path = List(vid)
val paths = Set(path)
makeMsg(vid -> paths)
} else
makeMsg()
}
结果如下图所示。这里要求取的目标点是3,4,5,8。所以所有的节点属性都会以它们的节点ID作为key,对应的值是当前点到目标点的路径集合
初始化激活所有的节点
激活所有节点需要一个初始消息,这里是一个空的Map
val initialMessage = makeMsg()
然后激活所有节点时,会直接触发vertexProgram方法。initialMsg 会作为msg参数传入。
消息的传递
消息的传递由sendMsg决定。假设将所有的triplet定义为:A → \rightarrow → B。并假设某一次迭代开始前,节点的激活情况以及节点属性如下图所示
(1)筛选出 A 或 B 处于激活态的三元组 A → \rightarrow → B。
(2)是否产生消息传递。对于 A 或 B 是出于激活态的 A → \rightarrow → B ,有两种情况不发生消息传递。如下图所示,
其中7 → \rightarrow → 6,5 → \rightarrow → 8,4 → \rightarrow → 8,3 → \rightarrow → 8,不发生消息传递,因为B的属性为空,而其他元组则因为所构建的消息在A中已存在。
(3)消息的构建,如下图所示
已知元组7 → \rightarrow → 1,7 → \rightarrow → 2,都会基于2的属性构建消息,然后发给7,每一个路径集合仅需要一个路径用于产生新的路径,例如节点1的属性,红色标识的路径会被舍弃。
4、消息传递的方向
消息传递的方向从总体上是从B到A,即将(3)构建的消息发送给A。
消息与消息的合并
已知两个三元组7 → \rightarrow → 1 和 7 → \rightarrow → 2,它们都会将消息发给节点7,而在节点4收到消息前,两个消息需要合并,mergeMsg负责合并两条消息。
private def mergeMsg(msg1: MsgType, msg2: MsgType): MsgType = {
(msg1.keySet ++ msg2.keySet).map {
k => k -> (msg1.getOrElse(k, Set()) ++ msg2.getOrElse(k, Set()))
}(collection.breakOut)
}
消息与属性的合并
vertexProgram负责将合并后的消息和当前接收消息的节点的属性进一步合并。
def vertexProgram(id: VertexId, attr: MsgType, msg: MsgType): MsgType = {
val attr_new = mergeMsg(attr, msg).map(data => {
val k = data._1
val v = filterPathsWithCommonNodes(data._2)
k -> v
})
attr_new
}
private def filterPathsWithCommonNodes(paths: Set[List[VertexId]]): Set[List[VertexId]] = {
paths.scan(List())((x, y) => {
val x_new = x.slice(1, x.length - 1)
val y_new = y.slice(1, y.length - 1)
val z_new = x_new ++ y_new
if (z_new.distinct.length == z_new.length) {
y
} else {
List()
}
}).filterNot(data => data.isEmpty)
}
完整代码
package com.edata.bigdata.algorithm.networks.approximation
import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/**
* @author: Alan Sword
*/
object AllPairNodeConnectivity extends Serializable {
type MsgType = Map[VertexId, Set[List[VertexId]]]
/**
* @param x
* @return
*/
private def makeMsg(x: (VertexId, Set[List[VertexId]])*) = Map(x: _*)
private def updateMsg(edge: EdgeTriplet[MsgType, _]): MsgType = {
val src_id = edge.srcId
val msg_new = edge.dstAttr.map(data => {
val k = data._1
val v = data._2
val v_new = v.map(p => p.::(src_id)).filterNot(p => p.distinct.length < p.length).slice(0, 1)
k -> v_new
})
msg_new
}
private def mergeMsg(msg1: MsgType, msg2: MsgType): MsgType = {
(msg1.keySet ++ msg2.keySet).map {
k => k -> (msg1.getOrElse(k, Set()) ++ msg2.getOrElse(k, Set()))
}(collection.breakOut)
}
private def filterPathsWithCommonNodes(paths: Set[List[VertexId]]): Set[List[VertexId]] = {
paths.scan(List())((x, y) => {
val x_new = x.slice(1, x.length - 1)
val y_new = y.slice(1, y.length - 1)
val z_new = x_new ++ y_new
if (z_new.distinct.length == z_new.length) {
y
} else {
List()
}
}).filterNot(data => data.isEmpty)
}
private def pathsExisted(msg1: MsgType, msg2: MsgType): Boolean = {
val paths_1 = msg1.values.reduce((x, y) => x ++ y)
val paths_2 = msg2.values.reduce((x, y) => x ++ y)
if ((paths_2 -- paths_1).isEmpty) {
true
} else {
false
}
}
def vertexProgram(id: VertexId, attr: MsgType, msg: MsgType): MsgType = {
val attr_new = mergeMsg(attr, msg).map(data => {
val k = data._1
val v = filterPathsWithCommonNodes(data._2)
k -> v
})
attr_new
}
//
/**
* @param edge a edge triplet (A->B).
* @return
* @Description Send message in 'Iterator[(VertexId,MsgType)]' format between node and node.
*/
private def sendMsg(edge: EdgeTriplet[MsgType, _]): Iterator[(VertexId, MsgType)] = {
if (edge.dstAttr.isEmpty) return Iterator.empty
val msg_new = updateMsg(edge)
if(edge.srcAttr.isEmpty) return Iterator((edge.srcId, msg_new))
if (pathsExisted(edge.srcAttr, msg_new)) return Iterator.empty
Iterator((edge.srcId, msg_new))
}
def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId], maxIterations: Int = 10): Graph[MsgType, ED] = {
val APNCGraph = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid)) {
val path = List(vid)
val paths = Set(path)
makeMsg(vid -> paths)
} else
makeMsg()
}
val initialMessage = makeMsg()
Pregel(APNCGraph, initialMessage, maxIterations = maxIterations)(vertexProgram, sendMsg, mergeMsg)
}
}
更多推荐
所有评论(0)