Skip to content

Commit 571a92a

Browse files
author
David Taieb
committed
Initial check-in for HelloGraphx
1 parent 4133091 commit 571a92a

File tree

4 files changed

+202
-0
lines changed

4 files changed

+202
-0
lines changed

Diff for: helloGraphx/build.sbt

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
name := "helloGraphx"
2+
3+
version := "1.0"
4+
5+
scalaVersion := "2.10.4"
6+
7+
libraryDependencies ++= {
8+
val sparkVersion = "1.6.0"
9+
Seq(
10+
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
11+
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
12+
"org.apache.spark" %% "spark-graphx" % sparkVersion % "provided",
13+
"org.apache.spark" %% "spark-repl" % sparkVersion % "provided",
14+
"org.http4s" %% "http4s-core" % "0.8.2",
15+
"org.http4s" %% "http4s-client" % "0.8.2",
16+
"org.http4s" %% "http4s-blazeclient" % "0.8.2"
17+
)
18+
}
19+
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

Diff for: helloGraphx/project/assembly.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

Diff for: helloGraphx/readme.md

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Start Developing with GraphX
2+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.ibm.cds.spark.samples
19+
20+
import org.apache.spark._
21+
import scalaz._
22+
import java.net.URL
23+
import java.util.Calendar
24+
import java.net.URLEncoder
25+
import java.text.SimpleDateFormat
26+
import org.apache.spark.sql.SQLContext
27+
import scala.collection.immutable.Map
28+
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.graphx.VertexId
30+
import org.apache.spark.sql.Row
31+
import org.apache.spark.graphx.Edge
32+
import org.apache.spark.graphx.Graph
33+
import org.http4s.EntityEncoder
34+
import org.codehaus.jettison.json.JSONObject
35+
import org.http4s.Uri
36+
import org.http4s.Request
37+
import org.http4s.BasicCredentials
38+
import org.http4s.headers.Authorization
39+
import org.http4s.Header
40+
import org.http4s.Headers
41+
import org.http4s.Method
42+
import org.http4s.client.blaze.PooledHttp1Client
43+
import org.http4s.client.Client
44+
import org.http4s.EntityDecoder
45+
import org.apache.spark.graphx.EdgeTriplet
46+
47+
class Node(val properties: Map[String, String]) extends Serializable
48+
case class Airport(override val properties: Map[String,String]) extends Node(properties)
49+
case class Country(override val properties: Map[String,String]) extends Node(properties)
50+
case class Continent(override val properties: Map[String,String]) extends Node(properties)
51+
case class Route(override val properties: Map[String, String]) extends Node(properties)
52+
53+
object HelloGraphx {
54+
55+
//main method invoked when running as a standalone Spark Application
56+
def main(args: Array[String]) {
57+
lazy val client = PooledHttp1Client()
58+
val conf = new SparkConf().setAppName("Hello Graphx")
59+
val sc = new SparkContext(conf)
60+
61+
println("Hello Graphx Demo. Load/Save a graph to/from Graphx RDDs")
62+
63+
val sqlContext = new SQLContext(sc);
64+
65+
//Load airports
66+
val airportsDF = sqlContext.read.format("com.databricks.spark.xml")
67+
.option("rowTag","node")
68+
.option("rootTag","graphml/graph")
69+
.load("/Users/dtaieb/Downloads/air-routes-graph/air-routes.graphml")
70+
airportsDF.printSchema()
71+
println(airportsDF.count())
72+
73+
val airportsRdd: RDD[(VertexId, Node with Product)] =
74+
airportsDF.map { x => {
75+
val propertiesMap:Map[String,String] = x.getAs[Seq[Row]]("data")
76+
.map { row => row.getAs[String]("@key")->row.getAs[String]("#VALUE") }.toMap
77+
val id = x.getAs[Long]("@id")
78+
val nodeType:String = propertiesMap.get("type").getOrElse("")
79+
nodeType match {
80+
case "airport" => (id, Airport(propertiesMap))
81+
case "country" => (id, Country(propertiesMap))
82+
case "continent" => (id, Continent(propertiesMap))
83+
case _ => println("Skip node with type " + nodeType); (id, null)
84+
}
85+
}}.filter( f => f._2 !=null )
86+
println(airportsRdd.take(5).deep.mkString("\n"))
87+
88+
//Load routes
89+
val routesDF = sqlContext.read.format("com.databricks.spark.xml")
90+
.option("rowTag","edge")
91+
.option("rootTag","graphml/graph")
92+
.load("/Users/dtaieb/Downloads/air-routes-graph/air-routes.graphml")
93+
routesDF.printSchema()
94+
println(routesDF.count())
95+
96+
val routesRdd: RDD[(Edge[Route])] =
97+
routesDF.map { x => {
98+
val propertiesMap:Map[String,String] = x.getAs[Seq[Row]]("data")
99+
.map { row => row.getAs[String]("@key")->row.getAs[String]("#VALUE") }.toMap +
100+
("id" -> x.getAs[Long]("@id").toString)
101+
Edge(x.getAs[Long]("@source"), x.getAs[Long]("@target"),Route(propertiesMap))
102+
}}
103+
println(routesRdd.take(5).deep.mkString("\n"))
104+
105+
val graph = Graph( airportsRdd, routesRdd )
106+
107+
//Iterate over the graph and send the vertices/edges to Gremlin Server
108+
graph.triplets.foreach( f => {
109+
addTriplet(client, f );
110+
})
111+
112+
//Traverse all nodes and all vertices, send them to the graphdb service via gremlin
113+
sc.stop()
114+
}
115+
116+
def escape(s:String):String={
117+
s.replace("'", "\\'")
118+
}
119+
120+
def addTriplet(client: Client, f: EdgeTriplet[Node with Product, Route] ){
121+
val sb = new StringBuilder()
122+
123+
//Add the source vertex if necessary
124+
sb.append( "v1=graph.traversal().V(" + f.srcId + ").tryNext().orElse(null);")
125+
sb.append(" if(!v1) v1=graph.addVertex(id, " + f.srcId)
126+
f.srcAttr.properties.foreach { case(k,v) => sb.append(",'" + escape(k) + "','" + escape(v) + "'" ) }
127+
sb.append(");")
128+
129+
//Add the target vertex if necessary
130+
sb.append( "v2=graph.traversal().V(" + f.dstId + ").tryNext().orElse(null);")
131+
sb.append(" if(!v2) v2=graph.addVertex(id, " + f.dstId)
132+
f.dstAttr.properties.foreach { case(k,v) => sb.append(",'" + escape(k) + "','" + escape(v) + "'") }
133+
sb.append(");")
134+
135+
//Add the edge
136+
sb.append("v1.addEdge('edge', v2")
137+
f.attr.properties.foreach { f => sb.append(",'" + escape(f._1) + "','" + escape(f._2) + "'") }
138+
sb.append(");")
139+
140+
runScript(client, sb.toString )
141+
}
142+
143+
def addVertex(client: Client, id: Long, keyValues: Seq[(String,String)]){
144+
val sb = new StringBuilder();
145+
sb.append( "if(!graph.traversal().V(" + id + ")) graph.addVertex(id, " + id);
146+
keyValues.foreach { case(k,v) => sb.append("," + k + "," + v) }
147+
sb.append(")")
148+
runScript(client, sb.toString() )
149+
}
150+
151+
def runScript(client: Client, script: String){
152+
//println("{\"gremlin\":" + JSONObject.quote( script ) + "}")
153+
val results = EntityEncoder[String].toEntity("{\"gremlin\":" + JSONObject.quote( script ) + "}" ).flatMap {
154+
entity =>
155+
val gremlinUri = Uri.fromString( "http://localhost:8182" ).getOrElse( null )
156+
client(
157+
Request(
158+
method = Method.POST,
159+
uri = gremlinUri,
160+
headers = Headers(
161+
Header("Accept", "application/json"),
162+
Header("Content-Type", "application/json")
163+
),
164+
body = entity.body
165+
)
166+
).flatMap { response =>
167+
val res = response.as[String]
168+
if (response.status.code == 200 ) {
169+
res
170+
} else {
171+
println( "Error received from Gremlin. Code : " + response.status.code + " reason: " + response.status.reason )
172+
res
173+
}
174+
}
175+
}.attemptRun match {
176+
case -\/(e) => //Ignore
177+
case \/-(a) => println(a)
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)