Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more example #65

Merged
merged 1 commit into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nebula-algorithm 是一款基于 [GraphX](https://spark.apache.org/graphx/) 的
| ClusteringCoefficient | 聚集系数 |推荐、电信诈骗分析|
| Jaccard |杰卡德相似度计算|相似度计算、推荐|
| BFS |广度优先遍历 |层序遍历、最短路径规划|
| DFS |深度优先遍历 |层序遍历、最短路径规划|
| Node2Vec | - |图分类|

使用 `nebula-algorithm`,可以通过提交 `Spark` 任务的形式使用完整的算法工具对 `Nebula Graph` 数据库中的数据执行图计算,也可以通过编程形式调用`lib`库下的算法针对DataFrame执行图计算。
Expand Down Expand Up @@ -101,6 +102,7 @@ nebula-algorithm 是一款基于 [GraphX](https://spark.apache.org/graphx/) 的
| closeness | closeness |double/string|
| hanp | hanp | int/string |
| bfs | bfs | string |
| dfs | dfs | string |
| jaccard | jaccard | string |
| node2vec | node2vec | string |

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nebula-algorithm is a Spark Application based on [GraphX](https://spark.apache.o
| ClusteringCoefficient | recommended, telecom fraud analysis|
| Jaccard | similarity calculation, recommendation|
| BFS | sequence traversal, Shortest path plan|
| DFS | sequence traversal, Shortest path plan|
| Node2Vec | graph machine learning, recommendation|


Expand Down Expand Up @@ -111,6 +112,7 @@ If you want to write the algorithm execution result into NebulaGraph(`sink: nebu
| closeness | closeness |double/string|
| hanp | hanp | int/string |
| bfs | bfs | string |
| bfs | dfs | string |
| jaccard | jaccard | string |
| node2vec | node2vec | string |

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader}
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.algorithm.config.{CcConfig, LPAConfig, LouvainConfig, PRConfig}
import com.vesoft.nebula.algorithm.lib.{
ConnectedComponentsAlgo,
LabelPropagationAlgo,
LouvainAlgo,
PageRankAlgo
}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object AlgoPerformanceTest {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

val df = readNebulaData(spark)
lpa(spark, df)
louvain(spark, df)
pagerank(spark, df)
wcc(spark, df)

}

def readNebulaData(spark: SparkSession): DataFrame = {
val start = System.currentTimeMillis()
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.0.1:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("twitter")
.withLabel("FOLLOW")
.withNoColumn(true)
.withLimit(20000)
.withPartitionNum(120)
.build()
val df: DataFrame =
spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
df.cache()
df.count()
println(s"read data cost time ${(System.currentTimeMillis() - start)}")
df
}

def lpa(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val lpaConfig = LPAConfig(10)
val lpa = LabelPropagationAlgo.apply(spark, df, lpaConfig, false)
lpa.write.csv("hdfs://127.0.0.1:9000/tmp/lpa")
println(s"lpa compute and save cost ${System.currentTimeMillis() - start}")
}

def pagerank(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val pageRankConfig = PRConfig(10, 0.85)
val pr = PageRankAlgo.apply(spark, df, pageRankConfig, false)
pr.write.csv("hdfs://127.0.0.1:9000/tmp/pagerank")
println(s"pagerank compute and save cost ${System.currentTimeMillis() - start}")
}

def wcc(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val ccConfig = CcConfig(20)
val cc = ConnectedComponentsAlgo.apply(spark, df, ccConfig, false)
cc.write.csv("hdfs://127.0.0.1:9000/tmp/wcc")
println(s"wcc compute and save cost ${System.currentTimeMillis() - start}")
}

def louvain(spark: SparkSession, df: DataFrame): Unit = {
val start = System.currentTimeMillis()
val louvainConfig = LouvainConfig(10, 5, 0.5)
val louvain = LouvainAlgo.apply(spark, df, louvainConfig, false)
louvain.write.csv("hdfs://127.0.0.1:9000/tmp/louvain")
println(s"louvain compute and save cost ${System.currentTimeMillis() - start}")
}

}