算法介绍

AllPairNodeConnectivity是基于 Spark Graphx 中的 Pregel 机制实现的算法。关于Pregel机制的理解可参考【大数据分析】基于Graphx的shortestpath源码解析。AllPairNodeConnectivity可以在有限的迭代次数内尽可能多地计算出图的所有节点(作为起始点) S S S 到目标点集 T T T 之间的独立路径。所谓独立路径,指的是它与其他路径除了 S S S T T T 之外,没有重合的点。

算法解析

数据的准备

样例数据可以参考已有的文章【大数据分析】基于Spark Graphx的图路径规划算法PathPlanning实现

关于图算法需要思考的问题

基于Pregel机制实现的算法,一般需要考虑几个问题
(1)如何定义节点的属性结构?
(2)如何初始化节点的属性?
(3)在进行第一次迭代前,如何激活所有的节点?
(4)如何传递消息(节点状态如何变化,消息传递的方向,如何进行消息的更新)
(5)接收到多个消息如何将它们进行组合(merge)
(6)最终接收到的消息如何与当前节点的属性组合(vertex_program)

定义节点的属性类型

type MsgType = Map[VertexId, Set[List[VertexId]]]

(1)VertexId 代表目标点集。
(2)Set[List[VertexId]] 代表从 S S S T T T 的独立路径集合

节点属性的初始化

    val APNCGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) {
        val path = List(vid)
        val paths = Set(path)
        makeMsg(vid -> paths)
      } else
        makeMsg()
    }

结果如下图所示。这里要求取的目标点是3,4,5,8。所以所有的节点属性都会以它们的节点ID作为key,对应的值是当前点到目标点的路径集合
在这里插入图片描述

初始化激活所有的节点

激活所有节点需要一个初始消息,这里是一个空的Map

val initialMessage = makeMsg()

然后激活所有节点时,会直接触发vertexProgram方法。initialMsg 会作为msg参数传入。

消息的传递

消息的传递由sendMsg决定。假设将所有的triplet定义为:A → \rightarrow B。并假设某一次迭代开始前,节点的激活情况以及节点属性如下图所示
在这里插入图片描述
(1)筛选出 A 或 B 处于激活态的三元组 A → \rightarrow B。
(2)是否产生消息传递。对于 A 或 B 是出于激活态的 A → \rightarrow B ,有两种情况不发生消息传递。如下图所示,
在这里插入图片描述

其中7 → \rightarrow 6,5 → \rightarrow 8,4 → \rightarrow 8,3 → \rightarrow 8,不发生消息传递,因为B的属性为空,而其他元组则因为所构建的消息在A中已存在。
(3)消息的构建,如下图所示
在这里插入图片描述
已知元组7 → \rightarrow 1,7 → \rightarrow 2,都会基于2的属性构建消息,然后发给7,每一个路径集合仅需要一个路径用于产生新的路径,例如节点1的属性,红色标识的路径会被舍弃。

4、消息传递的方向
消息传递的方向从总体上是从B到A,即将(3)构建的消息发送给A。

消息与消息的合并

已知两个三元组7 → \rightarrow 1 和 7 → \rightarrow 2,它们都会将消息发给节点7,而在节点4收到消息前,两个消息需要合并,mergeMsg负责合并两条消息。

  private def mergeMsg(msg1: MsgType, msg2: MsgType): MsgType = {
    (msg1.keySet ++ msg2.keySet).map {
      k => k -> (msg1.getOrElse(k, Set()) ++ msg2.getOrElse(k, Set()))
    }(collection.breakOut)
  }

在这里插入图片描述

消息与属性的合并

vertexProgram负责将合并后的消息和当前接收消息的节点的属性进一步合并。

  def vertexProgram(id: VertexId, attr: MsgType, msg: MsgType): MsgType = {
    val attr_new = mergeMsg(attr, msg).map(data => {
      val k = data._1
      val v = filterPathsWithCommonNodes(data._2)
      k -> v
    })
    attr_new
  }
  private def filterPathsWithCommonNodes(paths: Set[List[VertexId]]): Set[List[VertexId]] = {
    paths.scan(List())((x, y) => {
      val x_new = x.slice(1, x.length - 1)
      val y_new = y.slice(1, y.length - 1)
      val z_new = x_new ++ y_new
      if (z_new.distinct.length == z_new.length) {
        y
      } else {
        List()
      }
    }).filterNot(data => data.isEmpty)
  }

在这里插入图片描述

完整代码

package com.edata.bigdata.algorithm.networks.approximation

import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/**
 * @author: Alan Sword
 */

object AllPairNodeConnectivity extends Serializable {

  type MsgType = Map[VertexId, Set[List[VertexId]]]

  /**
   * @param x
   * @return
   */
  private def makeMsg(x: (VertexId, Set[List[VertexId]])*) = Map(x: _*)

  private def updateMsg(edge: EdgeTriplet[MsgType, _]): MsgType = {
    val src_id = edge.srcId
    val msg_new = edge.dstAttr.map(data => {
      val k = data._1
      val v = data._2
      val v_new = v.map(p => p.::(src_id)).filterNot(p => p.distinct.length < p.length).slice(0, 1)
      k -> v_new
    })
    msg_new
  }

  private def mergeMsg(msg1: MsgType, msg2: MsgType): MsgType = {
    (msg1.keySet ++ msg2.keySet).map {
      k => k -> (msg1.getOrElse(k, Set()) ++ msg2.getOrElse(k, Set()))
    }(collection.breakOut)
  }

  private def filterPathsWithCommonNodes(paths: Set[List[VertexId]]): Set[List[VertexId]] = {
    paths.scan(List())((x, y) => {
      val x_new = x.slice(1, x.length - 1)
      val y_new = y.slice(1, y.length - 1)
      val z_new = x_new ++ y_new
      if (z_new.distinct.length == z_new.length) {
        y
      } else {
        List()
      }
    }).filterNot(data => data.isEmpty)
  }

  private def pathsExisted(msg1: MsgType, msg2: MsgType): Boolean = {
    val paths_1 = msg1.values.reduce((x, y) => x ++ y)
    val paths_2 = msg2.values.reduce((x, y) => x ++ y)
    if ((paths_2 -- paths_1).isEmpty) {
      true
    } else {
      false
    }
  }


  def vertexProgram(id: VertexId, attr: MsgType, msg: MsgType): MsgType = {
    val attr_new = mergeMsg(attr, msg).map(data => {
      val k = data._1
      val v = filterPathsWithCommonNodes(data._2)
      k -> v
    })
    attr_new
  }
  //

  /**
   * @param edge a edge triplet (A->B).
   * @return
   * @Description Send message in 'Iterator[(VertexId,MsgType)]' format between node and node.
   */
  private def sendMsg(edge: EdgeTriplet[MsgType, _]): Iterator[(VertexId, MsgType)] = {
    if (edge.dstAttr.isEmpty) return Iterator.empty
    val msg_new = updateMsg(edge)
    if(edge.srcAttr.isEmpty) return Iterator((edge.srcId, msg_new))
    if (pathsExisted(edge.srcAttr, msg_new)) return Iterator.empty
    Iterator((edge.srcId, msg_new))
  }

  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId], maxIterations: Int = 10): Graph[MsgType, ED] = {
    val APNCGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) {
        val path = List(vid)
        val paths = Set(path)
        makeMsg(vid -> paths)
      } else
        makeMsg()
    }
    val initialMessage = makeMsg()
    Pregel(APNCGraph, initialMessage, maxIterations = maxIterations)(vertexProgram, sendMsg, mergeMsg)
  }
}
Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐