diff --git a/analytical_engine/core/context/java_context_base.h b/analytical_engine/core/context/java_context_base.h index 2ca4ad242c8e..cec36bc86619 100644 --- a/analytical_engine/core/context/java_context_base.h +++ b/analytical_engine/core/context/java_context_base.h @@ -132,7 +132,8 @@ class JavaContextBase : public grape::ContextBase { LOG(ERROR) << "Exception occurred when calling write back method"; } } else { - VLOG(2) << "Not write back method found"; + VLOG(2) << "Not write back method found, try to call output"; + callJavaContextOutput(); } } } @@ -237,6 +238,31 @@ class JavaContextBase : public grape::ContextBase { } } + // We expect user to write the sync back logic in their java code. + // i.e. copy the data from java heap to cpp context heap. + void callJavaContextOutput() { + JNIEnvMark m; + if (m.env()) { + JNIEnv* env = m.env(); + + jclass context_class = env->GetObjectClass(this->context_object()); + CHECK_NOTNULL(context_class); + + const char* descriptor = "(Lcom/alibaba/graphscope/fragment/IFragment;)V"; + jmethodID output_methodID = + env->GetMethodID(context_class, "Output", descriptor); + if (output_methodID) { + VLOG(1) << "Found output method in java context."; + env->CallVoidMethod(this->context_object(), output_methodID, + this->fragment_object()); + } else { + VLOG(1) << "Output method not found, skip."; + } + } else { + LOG(ERROR) << "JNI env not available."; + } + } + private: /** * @brief Generate user class path, i.e. URLClassLoader class path, from lib diff --git a/analytical_engine/core/context/java_pie_projected_context.h b/analytical_engine/core/context/java_pie_projected_context.h index fbac9ccbb381..6f4c3c536f41 100644 --- a/analytical_engine/core/context/java_pie_projected_context.h +++ b/analytical_engine/core/context/java_pie_projected_context.h @@ -71,30 +71,12 @@ class JavaPIEProjectedContext : public JavaContextBase { } void Output(std::ostream& os) override { - JNIEnvMark m; - if (m.env()) { - JNIEnv* env = m.env(); - - jclass context_class = env->GetObjectClass(this->context_object()); - CHECK_NOTNULL(context_class); - - const char* descriptor = "(Lcom/alibaba/graphscope/fragment/IFragment;)V"; - jmethodID output_methodID = - env->GetMethodID(context_class, "Output", descriptor); - if (output_methodID) { - VLOG(1) << "Found output method in java context."; - env->CallVoidMethod(this->context_object(), output_methodID, - this->fragment_object()); - } else { - VLOG(1) << "Output method not found, skip."; - } - } else { - LOG(ERROR) << "JNI env not available."; - } + LOG(WARNING) << "Output is not supported for JavaPIEProjectedContext"; } std::shared_ptr CreateInnerCtxWrapper( const std::string& id, std::shared_ptr frag_wrapper) { + JavaContextBase::WriteBackJVMHeapToCppContext(); std::string java_ctx_type_name = getJavaCtxTypeName(this->context_object()); VLOG(1) << "Java ctx type name" << java_ctx_type_name; if (java_ctx_type_name == "VertexDataContext") { @@ -144,6 +126,15 @@ class JavaPIEProjectedContext : public JavaContextBase { std::shared_ptr inner_ctx_impl_shared(inner_ctx_impl); return std::make_shared(id, frag_wrapper, inner_ctx_impl_shared); + } else if (data_type == "std::string") { + using inner_ctx_type = grape::VertexDataContext; + using inner_ctx_wrapper_type = + VertexDataContextWrapper; + auto inner_ctx_impl = + reinterpret_cast(this->inner_context_addr()); + std::shared_ptr inner_ctx_impl_shared(inner_ctx_impl); + return std::make_shared(id, frag_wrapper, + inner_ctx_impl_shared); } else { LOG(ERROR) << "Unrecognizable data type: " << data_type; } diff --git a/analytical_engine/core/context/java_pie_property_context.h b/analytical_engine/core/context/java_pie_property_context.h index 3eab4a5135d6..cc5ce8353a19 100644 --- a/analytical_engine/core/context/java_pie_property_context.h +++ b/analytical_engine/core/context/java_pie_property_context.h @@ -94,6 +94,7 @@ class JavaPIEPropertyContext : public JavaContextBase { std::shared_ptr CreateInnerCtxWrapper( const std::string& id, std::shared_ptr frag_wrapper) { + JavaContextBase::WriteBackJVMHeapToCppContext(); std::string java_ctx_type_name = getJavaCtxTypeName(this->context_object()); VLOG(1) << "Java ctx type name" << java_ctx_type_name; if (java_ctx_type_name == "LabeledVertexDataContext") { diff --git a/analytical_engine/core/worker/default_worker.h b/analytical_engine/core/worker/default_worker.h index 4f70d763a347..9c4587951074 100644 --- a/analytical_engine/core/worker/default_worker.h +++ b/analytical_engine/core/worker/default_worker.h @@ -131,7 +131,6 @@ class DefaultWorker { MPI_Barrier(comm_spec_.comm()); messages_.Finalize(); - finishQuery(); } std::shared_ptr GetContext() { return context_; } @@ -139,24 +138,6 @@ class DefaultWorker { void Output(std::ostream& os) { context_->Output(os); } private: - template - typename std::enable_if< - std::is_base_of, T>::value>::type - finishQuery() { - auto java_context = - std::dynamic_pointer_cast>(context_); - if (java_context) { - VLOG(1) << "Write java heap data back to cpp context since it is java " - "context"; - java_context->WriteBackJVMHeapToCppContext(); - } - } - - template - typename std::enable_if< - !std::is_base_of, T>::value>::type - finishQuery() {} - std::shared_ptr app_; std::shared_ptr context_; message_manager_t messages_; diff --git a/analytical_engine/frame/app_frame.cc b/analytical_engine/frame/app_frame.cc index 39bf7ccd975b..7fb7ace4c306 100644 --- a/analytical_engine/frame/app_frame.cc +++ b/analytical_engine/frame/app_frame.cc @@ -120,7 +120,7 @@ void Query(void* worker_handler, const gs::rpc::QueryArgs& query_args, const std::string& context_key, std::shared_ptr frag_wrapper, std::shared_ptr& ctx_wrapper, - bl::result& wrapper_error) { + bl::result& wrapper_error) { __FRAME_CATCH_AND_ASSIGN_GS_ERROR( wrapper_error, detail::Query(worker_handler, query_args, context_key, frag_wrapper, ctx_wrapper)); diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/Circle.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/Circle.java new file mode 100644 index 000000000000..7c65aec4b221 --- /dev/null +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/Circle.java @@ -0,0 +1,181 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.example.giraph.circle; + +import com.google.common.collect.Lists; + +import org.apache.giraph.Algorithm; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +@Algorithm(name = "Circle", description = "Finds Circle") +public class Circle + extends BasicComputation< + LongWritable, VertexAttrWritable, LongWritable, VertexAttrWritable> { + private static final Logger logger = LoggerFactory.getLogger(Circle.class); + int maxIteration = 3; + + public Circle() {} + + public void preSuperstep() { + this.maxIteration = Integer.parseInt(this.getConf().get("max", "3")); + logger.info("[preSuperstep] max is {}", this.maxIteration); + } + + public void compute( + Vertex vertex, + Iterable messages) + throws IOException { + this.maxIteration = Integer.parseInt(this.getConf().get("max", "3")); + long superStep = this.getSuperstep(); + + List writables = new ArrayList(); + for (int i = 0; i < vertex.getId().get() % 5; ++i) { + writables.add(new MsgWritable(Arrays.asList(1L, 2L, 3L), Arrays.asList(12L, 23L))); + } + vertex.setValue(new VertexAttrWritable(writables)); + vertex.voteToHalt(); + } + + private VertexAttrWritable mergeMsg(Iterable messages) { + VertexAttrWritable merged = new VertexAttrWritable(); + + VertexAttrWritable mess; + for (Iterator var3 = messages.iterator(); + var3.hasNext(); + merged = this.merge(merged, mess)) { + mess = (VertexAttrWritable) var3.next(); + } + + return merged; + } + + private void vprog(long vid, VertexAttrWritable vdata, VertexAttrWritable message) { + long superStep = this.getSuperstep(); + List nodeAttr = vdata.getVertexAttr(); + List messageAttr = message.getVertexAttr(); + List processedMsg = + (List) + messageAttr.stream() + .peek( + (item) -> { + List vlist = item.getVertexPath(); + this.addVertexToPath(vid, vlist); + }) + .collect(Collectors.toList()); + nodeAttr.addAll(processedMsg); + List finalNodeAttr = + (List) + nodeAttr.stream() + .distinct() + .filter( + (item) -> { + List vertexPath = item.getVertexPath(); + return this.filterPathInVertexAttr( + vertexPath, superStep + 1L); + }) + .collect(Collectors.toList()); + vdata.setVertexAttr(finalNodeAttr); + } + + private void addVertexToPath(long vid, List path) { + if (path.isEmpty()) { + path.add(vid); + } else { + int pathSize = path.size(); + if (pathSize == 1 && !path.contains(vid)) { + path.add(vid); + } else { + if (!path.subList(1, pathSize).contains(vid)) { + path.add(vid); + } + } + } + } + + private boolean filterPathInVertexAttr(List vertexPath, long iteration) { + int pathSize = vertexPath.size(); + return MsgWritable.isCircle(vertexPath) || (long) pathSize == iteration; + } + + private VertexAttrWritable merge(VertexAttrWritable attr1, VertexAttrWritable attr2) { + List sp1 = attr1.getVertexAttr(); + sp1.addAll(attr2.getVertexAttr()); + return new VertexAttrWritable((List) sp1.stream().distinct().collect(Collectors.toList())); + } + + private void sendMsg(Vertex vertex) { + List currVertexAttr = ((VertexAttrWritable) vertex.getValue()).getVertexAttr(); + long superStep = this.getSuperstep(); + if (!currVertexAttr.isEmpty()) { + Iterator var5 = vertex.getEdges().iterator(); + + while (var5.hasNext()) { + Edge edge = (Edge) var5.next(); + long edgeId = ((LongWritable) edge.getValue()).get(); + List finalMsgs = + (List) + currVertexAttr.stream() + .filter( + (path) -> { + return this.meetMsgCondition( + ((LongWritable) vertex.getId()).get(), + edgeId, + path, + superStep); + }) + .map( + (path) -> { + List newEdgeSet = + Lists.newArrayList(path.getEdgePath()); + newEdgeSet.add(edgeId); + return new MsgWritable( + path.getVertexPath(), newEdgeSet); + }) + .collect(Collectors.toList()); + if (!finalMsgs.isEmpty()) { + this.sendMessage(edge.getTargetVertexId(), new VertexAttrWritable(finalMsgs)); + } + } + } + } + + private boolean meetMsgCondition( + long currVertexId, + long edgeIdToSendMsg, + MsgWritable onePathInCurrVertex, + long superStep) { + List vertexPath = onePathInCurrVertex.getVertexPath(); + List edgePath = onePathInCurrVertex.getEdgePath(); + int vertexPathSize = vertexPath.size(); + return (long) vertexPathSize == superStep + 1L + && !MsgWritable.isCircle(vertexPath) + && currVertexId == (Long) vertexPath.get(vertexPathSize - 1) + && !edgePath.contains(edgeIdToSendMsg); + } +} diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleEdgeInputFormat.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleEdgeInputFormat.java new file mode 100644 index 000000000000..5891a619257f --- /dev/null +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleEdgeInputFormat.java @@ -0,0 +1,73 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.example.giraph.circle; + +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.formats.TextEdgeInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +public class CircleEdgeInputFormat extends TextEdgeInputFormat { + public CircleEdgeInputFormat() {} + + public EdgeReader createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new CircleEdgeInputFormat.P2PEdgeReader(); + } + + public class P2PEdgeReader + extends TextEdgeInputFormat + .TextEdgeReaderFromEachLineProcessed< + String[]> { + String SEPARATOR = " "; + private LongWritable srcId; + private LongWritable dstId; + private LongWritable edgeValue; + + public P2PEdgeReader() { + super(); + } + + protected String[] preprocessLine(Text line) throws IOException { + String[] tokens = line.toString().split(this.SEPARATOR); + if (tokens.length != 3) { + throw new IllegalStateException("expect 3 ele in edge line"); + } else { + this.srcId = new LongWritable(Long.parseLong(tokens[0])); + this.dstId = new LongWritable(Long.parseLong(tokens[1])); + this.edgeValue = new LongWritable(Long.parseLong(tokens[2])); + return tokens; + } + } + + protected LongWritable getTargetVertexId(String[] line) throws IOException { + return this.dstId; + } + + protected LongWritable getSourceVertexId(String[] line) throws IOException { + return this.srcId; + } + + protected LongWritable getValue(String[] line) throws IOException { + return this.edgeValue; + } + } +} diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleVertexInputFormat.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleVertexInputFormat.java new file mode 100644 index 000000000000..dc0d682aa866 --- /dev/null +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/CircleVertexInputFormat.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.example.giraph.circle; + +import com.google.common.collect.Lists; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CircleVertexInputFormat + extends TextVertexInputFormat { + public CircleVertexInputFormat() {} + + public TextVertexInputFormat.TextVertexReader + createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new CircleVertexInputFormat.P2PVertexReader(); + } + + public class P2PVertexReader + extends TextVertexInputFormat + .TextVertexReaderFromEachLineProcessed< + String[]> { + String SEPARATOR = " "; + private LongWritable id; + private VertexAttrWritable value; + + public P2PVertexReader() { + super(); + } + + protected String[] preprocessLine(Text line) throws IOException { + String[] tokens = line.toString().split(this.SEPARATOR); + this.id = new LongWritable(Long.parseLong(tokens[0])); + List writables = new ArrayList(); + writables.add(new MsgWritable(Arrays.asList(1L, 2L, 3L), Arrays.asList(12L, 23L))); + this.value = new VertexAttrWritable(writables); + return tokens; + } + + protected LongWritable getId(String[] tokens) throws IOException { + return this.id; + } + + protected VertexAttrWritable getValue(String[] tokens) throws IOException { + return this.value; + } + + protected Iterable> getEdges(String[] tokens) + throws IOException { + List> edges = Lists.newArrayListWithCapacity(0); + return edges; + } + } +} diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/MsgWritable.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/MsgWritable.java new file mode 100644 index 000000000000..bec8f77ab5dd --- /dev/null +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/MsgWritable.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.example.giraph.circle; + +import com.alibaba.fastjson.JSONObject; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MsgWritable implements Writable { + private List vertexPath; + private List edgePath; + + public MsgWritable() { + this.vertexPath = new ArrayList(); + this.edgePath = new ArrayList(); + } + + public MsgWritable(List vertexPath, List edgePath) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + } + + public static boolean isCircle(List vertexList) { + int size = vertexList.size(); + return size > 1 && (Long) vertexList.get(0) == (Long) vertexList.get(size - 1); + } + + public List getVertexPath() { + return this.vertexPath; + } + + public void setVertexPath(List vertexPath) { + this.vertexPath = vertexPath; + } + + public List getEdgePath() { + return this.edgePath; + } + + public void setEdgePath(List edgePath) { + this.edgePath = edgePath; + } + + public void write(DataOutput dataOutput) throws IOException { + int vSize = this.vertexPath.size(); + dataOutput.writeInt(vSize); + Iterator var3 = this.vertexPath.iterator(); + + while (var3.hasNext()) { + long v = (Long) var3.next(); + dataOutput.writeLong(v); + } + + int eSize = this.edgePath.size(); + dataOutput.writeInt(eSize); + Iterator var8 = this.edgePath.iterator(); + + while (var8.hasNext()) { + long e = (Long) var8.next(); + dataOutput.writeLong(e); + } + } + + public void readFields(DataInput dataInput) throws IOException { + this.vertexPath = this.readLongList(dataInput); + this.edgePath = this.readLongList(dataInput); + } + + private List readLongList(DataInput dataInput) throws IOException { + int size = dataInput.readInt(); + List list = new ArrayList(); + if (size != 0) { + for (int i = 0; i < size; ++i) { + list.add(dataInput.readLong()); + } + } + + return list; + } + + @Override + public String toString() { + JSONObject json = new JSONObject(); + json.put("v", this.vertexPath); + json.put("e", this.edgePath); + return json.toJSONString(); + } + + public boolean equals(Object otherObj) { + if (!(otherObj instanceof MsgWritable)) { + return false; + } else { + MsgWritable other = (MsgWritable) otherObj; + return ((String) + this.vertexPath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(","))) + .equals( + other.vertexPath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(","))) + && ((String) + this.edgePath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(","))) + .equals( + other.edgePath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(","))); + } + } + + public int hashCode() { + return Objects.hash( + new Object[] { + this.vertexPath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(",")), + this.edgePath.stream() + .map( + (i) -> { + return i + ""; + }) + .collect(Collectors.joining(",")) + }); + } +} diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/VertexAttrWritable.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/VertexAttrWritable.java new file mode 100644 index 000000000000..a84c3832eec7 --- /dev/null +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/giraph/circle/VertexAttrWritable.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.example.giraph.circle; + +import com.google.common.collect.Lists; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +public class VertexAttrWritable implements Writable { + private List vertexAttr; + + public VertexAttrWritable() { + this.vertexAttr = Lists.newArrayList(new MsgWritable[] {new MsgWritable()}); + } + + public VertexAttrWritable(List values) { + this.vertexAttr = values; + } + + public List getVertexAttr() { + return this.vertexAttr; + } + + public void setVertexAttr(List vertexAttr) { + this.vertexAttr = vertexAttr; + } + + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + List vertexAttr = new ArrayList(); + if (size != 0) { + for (int i = 0; i < size; ++i) { + MsgWritable msgWritable = new MsgWritable(); + msgWritable.readFields(in); + vertexAttr.add(msgWritable); + } + } + + this.vertexAttr = vertexAttr; + } + + public void write(DataOutput out) throws IOException { + out.writeInt(this.vertexAttr.size()); + Iterator var2 = this.vertexAttr.iterator(); + + while (var2.hasNext()) { + MsgWritable msgWritable = (MsgWritable) var2.next(); + msgWritable.write(out); + } + } + + @Override + public String toString() { + List pathList = + (List) + this.vertexAttr.stream() + .filter( + (path) -> { + return MsgWritable.isCircle(path.getVertexPath()); + }) + .map( + (path) -> { + return StringUtils.join(path.getEdgePath(), "&"); + }) + .collect(Collectors.toList()); + return !pathList.isEmpty() ? StringUtils.join(pathList, "|") : "No Circle"; + } +} diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringApp.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringApp.java index 110ca76c7a23..2f6342d69fa4 100644 --- a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringApp.java +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringApp.java @@ -17,7 +17,7 @@ public class StringApp implements ParallelAppBase, ParallelEngine { - private static Logger logger = LoggerFactory.getLogger(ParallelAppBase.class); + private static Logger logger = LoggerFactory.getLogger(StringApp.class); /** * Partial Evaluation to implement. diff --git a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringAppContext.java b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringAppContext.java index 80a6f61b1701..b3a0759dded0 100644 --- a/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringAppContext.java +++ b/analytical_engine/java/grape-demo/src/main/java/com/alibaba/graphscope/example/stringApp/StringAppContext.java @@ -3,15 +3,19 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.graphscope.context.ParallelContextBase; import com.alibaba.graphscope.context.VertexDataContext; +import com.alibaba.graphscope.ds.GSVertexArray; import com.alibaba.graphscope.ds.StringView; +import com.alibaba.graphscope.ds.Vertex; import com.alibaba.graphscope.fragment.IFragment; import com.alibaba.graphscope.parallel.ParallelMessageManager; +import com.alibaba.graphscope.stdcxx.StdString; +import com.alibaba.graphscope.utils.FFITypeFactoryhelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StringAppContext - extends VertexDataContext, Integer> + extends VertexDataContext, StdString> implements ParallelContextBase { private static Logger logger = LoggerFactory.getLogger(StringAppContext.class); @@ -34,7 +38,7 @@ public void Init( IFragment frag, ParallelMessageManager messageManager, JSONObject jsonObject) { - createFFIContext(frag, Integer.class, false); + createFFIContext(frag, StdString.class, false); } /** @@ -45,5 +49,16 @@ public void Init( * @see IFragment */ @Override - public void Output(IFragment frag) {} + public void Output(IFragment frag) { + // output to inner vertex data + GSVertexArray vertexData = data(); + Vertex vertex = FFITypeFactoryhelper.newVertexLong(); + logger.info("Begin output"); + for (long i = 0; i < vertexData.size(); ++i) { + vertex.setValue(i); + StdString string = vertexData.get(vertex); + string.fromJavaString("vertex: " + i); + } + logger.info("Finish out"); + } } diff --git a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java index 43931d6e77fb..0d669773f10b 100644 --- a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java +++ b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java @@ -171,9 +171,11 @@ public void writeBackVertexData() { { long previous = 0; try { - if (conf.getGrapeVdataClass().equals(String.class)) { + if (conf.getGrapeVdataClass().equals(String.class) + || conf.getGrapeVdataClass().equals(StringView.class)) { for (long lid = 0; lid < innerVerticesNum; ++lid) { - vertexDataManager.getVertexData(lid).write(outputStream); + // Write the output of toString(). + outputStream.writeBytes(vertexDataManager.getVertexData(lid).toString()); long cur = outputStream.bytesWriten(); offsets[(int) lid] = cur - previous; maxOffset = Math.max(offsets[(int) lid], maxOffset); @@ -258,13 +260,14 @@ public void writeBackVertexData() { throw new IllegalStateException( "Input stream too short for " + innerVerticesNum + " vertices"); } - if (inputStream.read(bytes, 0, (int) offsets[(int) lid]) == -1) { + int bytes_read = inputStream.read(bytes, 0, (int) offsets[(int) lid]); + if (bytes_read == -1) { throw new IllegalStateException("read input stream failed"); } // This string is not readable. StdString value = (StdString) vertexArray.get(grapeVertex); // TODO: can be optimized without creating a java string - value.fromJavaString(new String(bytes)); + value.fromJavaString(new String(bytes, 0, bytes_read)); } } else { throw new IllegalStateException( diff --git a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java index 3390dbe3a356..e145b130cfd0 100644 --- a/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java +++ b/analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java @@ -112,9 +112,9 @@ private void readVertexDataFromIFragment(FFIByteVectorOutputStream outputStream) // We need to form all vdata as a stream, so java writables can read from this stream. Iterable> iterable; if (conf.getGrapeVidClass().equals(Long.class)) { - iterable = (Iterable>) fragment.vertices().longIterable(); + iterable = (Iterable>) fragment.innerVertices().longIterable(); } else if (conf.getGrapeVidClass().equals(Integer.class)) { - iterable = (Iterable>) fragment.vertices().intIterable(); + iterable = (Iterable>) fragment.innerVertices().intIterable(); } else { throw new IllegalStateException( "No recognizable vid" + conf.getGrapeVidClass().getName()); diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/AppContextGetter.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/AppContextGetter.java index 28a34b528a41..c12a831cdab5 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/AppContextGetter.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/AppContextGetter.java @@ -172,12 +172,16 @@ public static String getContextName(Object obj) { public static String getLabeledVertexDataContextDataType(LabeledVertexDataContext ctxObj) { Class ctxClass = ctxObj.getClass(); Class ret = getBaseClassTemplateType(ctxClass, 1); - if (ret.getName() == "java.lang.Double") { + if (ret.getName().equals("java.lang.Double")) { return "double"; - } else if (ret.getName() == "java.lang.Integer") { + } else if (ret.getName().equals("java.lang.Integer")) { return "uint32_t"; - } else if (ret.getName() == "java.lang.Long") { + } else if (ret.getName().equals("java.lang.Long")) { return "uint64_t"; + } else if (ret.getName().equals("com.alibaba.graphscope.ds.StringView") + || ret.getName().equals("java.lang.String") + || ret.getName().equals("com.alibaba.graphscope.stdcxx.StdString")) { + return "std::string"; } return null; } @@ -194,12 +198,16 @@ public static String getVertexDataContextDataType(VertexDataContext ctxObj) { logger.info("vertex data context class: " + ret.getName()); } - if (ret.getName() == "java.lang.Double") { + if (ret.getName().equals("java.lang.Double")) { return "double"; - } else if (ret.getName() == "java.lang.Integer") { + } else if (ret.getName().equals("java.lang.Integer")) { return "int32_t"; - } else if (ret.getName() == "java.lang.Long") { + } else if (ret.getName().equals("java.lang.Long")) { return "int64_t"; + } else if (ret.getName().equals("com.alibaba.graphscope.ds.StringView") + || ret.getName().equals("java.lang.String") + || ret.getName().equals("com.alibaba.graphscope.stdcxx.StdString")) { + return "std::string"; } return null; } diff --git a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/FFITypeFactoryhelper.java b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/FFITypeFactoryhelper.java index 1e2020cef981..371fb0673530 100644 --- a/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/FFITypeFactoryhelper.java +++ b/analytical_engine/java/grape-jdk/src/main/java/com/alibaba/graphscope/utils/FFITypeFactoryhelper.java @@ -85,6 +85,8 @@ public static String javaType2CppType(Class clz) { return "std::string"; } else if (clz.getName() == StringView.class.getName()) { return "std::string"; + } else if (clz.getName() == StdString.class.getName()) { + return "std::string"; } else { logger.error("Must be one of long, double, integer, but got: " + clz.getName()); return "null"; diff --git a/analytical_engine/java/grape-runtime/CMakeLists.txt b/analytical_engine/java/grape-runtime/CMakeLists.txt index 90227978d45e..61e78827ff07 100644 --- a/analytical_engine/java/grape-runtime/CMakeLists.txt +++ b/analytical_engine/java/grape-runtime/CMakeLists.txt @@ -42,7 +42,12 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "Clang") message(STATUS "Obtain directory: ${COMPILER_DIR}") if (NOT APPLE) #ld.lld can't work on mac. - set(CMAKE_JNI_LINKER_FLAGS "-fuse-ld=${COMPILER_DIR}/ld.lld -Xlinker -mllvm=-lto-embed-bitcode") + set(CMAKE_JNI_LINKER_FLAGS "-fuse-ld=${COMPILER_DIR}/ld.lld -Xlinker") + include(CheckCXXCompilerFlag) + check_cxx_compiler_flag(-mllvm=-lto-embed-bitcode SUPPORTS_LTO_EMBED_BITCODE) + if(SUPPORTS_LTO_EMBED_BITCODE) + set(CMAKE_JNI_LINKER_FLAGS "${CMAKE_JNI_LINKER_FLAGS}-mllvm=-lto-embed-bitcode") + endif() endif() else() message(STATUS "Using a non-clang compiler will lose performance evaluation provided by LLVM4JNI. Clang-11 compiler is recommended") diff --git a/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/AnnotationInvoker.java b/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/AnnotationInvoker.java index e1a6a46fc7da..75c69c03e39f 100644 --- a/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/AnnotationInvoker.java +++ b/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/AnnotationInvoker.java @@ -1166,6 +1166,25 @@ + ">", "Integer" }), + @CXXTemplate( + cxx = { + CPP_ARROW_PROJECTED_FRAGMENT + + "", + "std::string" + }, + java = { + JAVA_ARROW_PROJECTED_FRAGMENT + + "<" + + LONG + + "," + + LONG + + "," + + STRING + + "," + + LONG + + ">", + STD_STRING + }), @CXXTemplate( cxx = { CPP_ARROW_PROJECTED_FRAGMENT diff --git a/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/Utils.java b/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/Utils.java index 322f6fe6af49..95fa081de126 100644 --- a/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/Utils.java +++ b/analytical_engine/java/grape-runtime/src/main/java/com/alibaba/graphscope/annotation/Utils.java @@ -238,6 +238,8 @@ public static String java2Cpp(String javaType, boolean signed) { return "double"; } else if (javaType.equals("com.alibaba.graphscope.ds.StringView")) { return "std::string"; + } else if (javaType.equals("com.alibaba.graphscope.stdcxx.StdString")) { + return "std::string"; } throw new IllegalStateException("Unrecognized type " + javaType + " sign: " + signed); } diff --git a/analytical_engine/test/app_tests.sh b/analytical_engine/test/app_tests.sh index 649fb6bd8984..5a8552c931b8 100755 --- a/analytical_engine/test/app_tests.sh +++ b/analytical_engine/test/app_tests.sh @@ -443,6 +443,12 @@ then then echo "Running Java tests..." run_vy_2 ${np} ./run_java_app "${socket_file}" 1 "${test_dir}"/projected_property/twitter_property_e "${test_dir}"/projected_property/twitter_property_v 1 0 1 com.alibaba.graphscope.example.bfs.BFS + + GLOG_v=10 mpirun -n 1 ./run_java_app "${socket_file}" 1 \ + "${test_dir}/property/p2p-31_property_e_0#header_row=True#src_label=v&dst_label=v&label=e&delimiter=," \ + 1 "${test_dir}/property/p2p-31_property_v_0#header_row=True#label=v&included_column=id,age&delimiter=," \ + 1 0 1 com.alibaba.graphscope.example.sssp.SSSP + GLOG_v=10 ./run_java_string_app /tmp/vineyard.sock \ 1 "${test_dir}/projected_property/twitter_property_e_0#header_row=True#src_label=v&dst_label=v&label=e&include_all_columns=true&column_types=int64_t,int64_t,int32_t,int32_t,std::string" \ 1 "${test_dir}/projected_property/twitter_property_v_0#header_row=True#label=v&include_all_columns=true&column_types=int64_t,std::string" \ @@ -459,6 +465,12 @@ then --edge_input_format_class giraph:com.alibaba.graphscope.example.giraph.format.P2PEdgeMultipleLongInputFormat --vfile "${test_dir}"/p2p-31.v \ --efile "${test_dir}"/p2p-31.e --ipc_socket /tmp/vineyard.sock --lib_path /opt/graphscope/lib/libgrape-jni.so \ --user_app_class com.alibaba.graphscope.example.giraph.MessageAppWithUserWritable + + echo "Test Giraph app user Circle App" + GLOG_v=10 ./giraph_runner --vertex_input_format_class giraph:com.alibaba.graphscope.example.giraph.circle.CircleVertexInputFormat \ + --edge_input_format_class giraph:com.alibaba.graphscope.example.giraph.circle.CircleEdgeInputFormat --vfile "${test_dir}"/p2p-31.v \ + --efile "${test_dir}"/p2p-31.e --ipc_socket /tmp/vineyard.sock --lib_path /opt/graphscope/lib/libgrape-jni.so \ + --user_app_class com.alibaba.graphscope.example.giraph.circle.Circle fi fi diff --git a/analytical_engine/test/giraph_runner.h b/analytical_engine/test/giraph_runner.h index 88eda0c1c553..8ccde4d653fd 100644 --- a/analytical_engine/test/giraph_runner.h +++ b/analytical_engine/test/giraph_runner.h @@ -322,6 +322,18 @@ void CreateAndQuery(std::string params) { ArrowProjectedFragment; ProjectAndQuery( comm_spec, fragment, frag_name, new_params, user_lib_path, query_times); + } else if (vertex_data_type == "STRING" && edge_data_type == "LONG") { + std::string frag_name = + "gs::ArrowProjectedFragment"; + pt.put("frag_name", frag_name); + std::stringstream ss; + boost::property_tree::json_parser::write_json(ss, pt); + std::string new_params = ss.str(); + using ProjectedFragmentType = + ArrowProjectedFragment; + ProjectAndQuery( + comm_spec, fragment, frag_name, new_params, user_lib_path, query_times); + } else { std::string frag_name = "gs::ArrowProjectedFragment"; diff --git a/analytical_engine/test/run_java_app.cc b/analytical_engine/test/run_java_app.cc index 571080829ac8..7dade292bf7d 100644 --- a/analytical_engine/test/run_java_app.cc +++ b/analytical_engine/test/run_java_app.cc @@ -58,8 +58,7 @@ namespace bl = boost::leaf; using FragmentType = vineyard::ArrowFragment; -using ProjectedFragmentType = - gs::ArrowProjectedFragment; + void output_nd_array(const grape::CommSpec& comm_spec, std::unique_ptr arc, const std::string& output_path, int data_type_expected) { @@ -95,6 +94,12 @@ void output_nd_array(const grape::CommSpec& comm_spec, oarc >> v; assembled_ostream << v << std::endl; } + } else if (data_type_expected == vineyard::TypeToInt::value) { + for (int64_t i = 0; i < length1; ++i) { + double v; + oarc >> v; + assembled_ostream << v << std::endl; + } } else { LOG(FATAL) << "Unregonizable data type " << data_type_expected; } @@ -151,12 +156,18 @@ void output_data_frame(const grape::CommSpec& comm_spec, oarc >> data; assembled_col2_ostream << data << std::endl; } - } else if (expected_data_type == 4) { + } else if (expected_data_type == vineyard::TypeToInt::value) { for (int64_t i = 0; i < length; ++i) { int64_t data; oarc >> data; assembled_col2_ostream << data << std::endl; } + } else if (expected_data_type == vineyard::TypeToInt::value) { + for (int64_t i = 0; i < length; ++i) { + double data; + oarc >> data; + assembled_col2_ostream << data << std::endl; + } } else { LOG(FATAL) << "Unregonizable data type " << expected_data_type; } @@ -260,7 +271,8 @@ void QueryProperty(vineyard::Client& client, // 1. Test data frame { - // auto selectors = gs::gs::Selector::ParseSelectors(s_selectors).value(); + // auto selectors = + // gs::gs::Selector::ParseSelectors(s_selectors).value(); std::unique_ptr arc = std::move( vp_ctx_wrapper->ToDataframe(comm_spec, selectors, range).value()); std::string java_data_frame_out_prefix = out_prefix + "/java"; @@ -298,7 +310,8 @@ void QueryProperty(vineyard::Client& client, // 1. Test data frame { - // auto selectors = gs::gs::Selector::ParseSelectors(s_selectors).value(); + // auto selectors = + // gs::gs::Selector::ParseSelectors(s_selectors).value(); std::unique_ptr arc = std::move( vd_ctx_wrapper->ToDataframe(comm_spec, selectors, range).value()); std::string java_data_frame_out_prefix = out_prefix + "/java"; @@ -325,13 +338,13 @@ void QueryProperty(vineyard::Client& client, } } -void QueryProjected(vineyard::Client& client, - std::shared_ptr fragment, - const grape::CommSpec& comm_spec, - const std::string& app_name, const std::string& out_prefix, - const std::string& basic_params, - const std::string& selector_string, - const std::string& selectors_string) { +template +void QueryProjected( + vineyard::Client& client, std::shared_ptr fragment, + const grape::CommSpec& comm_spec, const std::string& app_name, + const std::string& out_prefix, const std::string& basic_params, + const std::string& selector_string, const std::string& selectors_string, + int32_t expected_data_type, vineyard::AnyType expected_tensor_type) { using AppType = gs::JavaPIEProjectedParallelAppOE; auto app = std::make_shared(); auto worker = AppType::CreateWorker(app, fragment); @@ -375,18 +388,19 @@ void QueryProjected(vineyard::Client& client, std::string java_out_prefix = out_prefix + "/java_projected_assembled_ndarray.dat"; output_nd_array(comm_spec, std::move(arc), java_out_prefix, - 4); // 4 for int64_t + expected_data_type); // 4 for int64_t } VLOG(1) << "[0] java projected finish test ndarray"; // 1. Test data frame { - // auto selectors = gs::gs::Selector::ParseSelectors(s_selectors).value(); + // auto selectors = + // gs::gs::Selector::ParseSelectors(s_selectors).value(); std::unique_ptr arc = std::move( vp_ctx_wrapper->ToDataframe(comm_spec, selectors, range).value()); std::string java_data_frame_out_prefix = out_prefix + "/java_projected"; output_data_frame(comm_spec, std::move(arc), java_data_frame_out_prefix, - 4); + expected_data_type); } VLOG(1) << "[1] java projected finish test dataframe"; @@ -397,10 +411,9 @@ void QueryProjected(vineyard::Client& client, CHECK(tmp); vineyard::ObjectID ndarray_object = tmp.value(); std::string java_v6d_tensor_prefix = out_prefix + "/java_projected"; - vineyard::AnyType expected_data_type = vineyard::AnyType::Int64; // 4 output_vineyard_tensor(client, ndarray_object, comm_spec, java_v6d_tensor_prefix, - expected_data_type); + expected_tensor_type); } VLOG(1) << "[2] java projected finish test vineyard tensor"; @@ -414,18 +427,17 @@ void QueryProjected(vineyard::Client& client, std::string java_out_prefix = out_prefix + "/java_projected_assembled_ndarray.dat"; output_nd_array(comm_spec, std::move(arc), java_out_prefix, - 4); // 4 for int64_t + expected_data_type); // 4 for int64_t } VLOG(1) << "[0] java projected finish test ndarray"; // 1. Test data frame { - // auto selectors = gs::gs::Selector::ParseSelectors(s_selectors).value(); std::unique_ptr arc = std::move( vd_ctx_wrapper->ToDataframe(comm_spec, selectors, range).value()); std::string java_data_frame_out_prefix = out_prefix + "/java_projected"; output_data_frame(comm_spec, std::move(arc), java_data_frame_out_prefix, - 4); + expected_data_type); } VLOG(1) << "[1] java projected finish test dataframe"; @@ -436,17 +448,22 @@ void QueryProjected(vineyard::Client& client, CHECK(tmp); vineyard::ObjectID ndarray_object = tmp.value(); std::string java_v6d_tensor_prefix = out_prefix + "/java_projected"; - vineyard::AnyType expected_data_type = vineyard::AnyType::Int64; // 4 - output_vineyard_tensor(client, ndarray_object, comm_spec, - java_v6d_tensor_prefix, - expected_data_type); + if (expected_tensor_type == vineyard::AnyType::Double) { + output_vineyard_tensor(client, ndarray_object, comm_spec, + java_v6d_tensor_prefix, + expected_tensor_type); + } else if (expected_tensor_type == vineyard::AnyType::Int64) { + output_vineyard_tensor(client, ndarray_object, comm_spec, + java_v6d_tensor_prefix, + expected_tensor_type); + } else { + LOG(FATAL) << "Unregonizable data type " << expected_tensor_type; + } } VLOG(1) << "[2] java projected finish test vineyard tensor"; - } else { LOG(ERROR) << "Unrecognized ctx type: " << ctx_wrapper->context_type(); } - // auto selector = gs::LabeledSelector::parse("r:label0.property0").value(); } // Running test doesn't require codegen. @@ -501,8 +518,13 @@ void Run(vineyard::Client& client, const grape::CommSpec& comm_spec, QueryProperty(client, fragment, comm_spec, app_name, "/tmp", basic_params, selector_string, selectors_string); } else { // 3. run projected - pt.put("frag_name", - "gs::ArrowProjectedFragment"); + if (app_name.find("SSSP") != std::string::npos) { + pt.put("frag_name", + "gs::ArrowProjectedFragment"); + } else { + pt.put("frag_name", + "gs::ArrowProjectedFragment"); + } std::stringstream ss; boost::property_tree::json_parser::write_json(ss, pt); std::string basic_params = ss.str(); @@ -510,13 +532,6 @@ void Run(vineyard::Client& client, const grape::CommSpec& comm_spec, VLOG(1) << "running projected"; VLOG(1) << "vertex properties num: " << fragment->vertex_property_num(0); VLOG(1) << "edge properties num: " << fragment->edge_property_num(0); - std::shared_ptr projected_fragment = - ProjectedFragmentType::Project(fragment, 0, 0, 0, 0); - // test get data - using vertex_t = ProjectedFragmentType::vertex_t; - vertex_t vertex; - projected_fragment->GetInnerVertex(4, vertex); - VLOG(1) << "source vertex" << vertex.GetValue(); { std::string selector_string; std::string selectors_string; @@ -539,8 +554,35 @@ void Run(vineyard::Client& client, const grape::CommSpec& comm_spec, selectors_string = gs::generate_selectors(selector_list); } } - QueryProjected(client, projected_fragment, comm_spec, app_name, "/tmp", - basic_params, selector_string, selectors_string); + if (app_name.find("SSSP") != std::string::npos) { + using ProjectedFragmentType = + gs::ArrowProjectedFragment; + std::shared_ptr projected_fragment = + ProjectedFragmentType::Project(fragment, 0, 0, 0, 0); + // test get data + using vertex_t = ProjectedFragmentType::vertex_t; + vertex_t vertex; + projected_fragment->GetInnerVertex(4, vertex); + VLOG(1) << "source vertex" << vertex.GetValue(); + QueryProjected(client, projected_fragment, comm_spec, app_name, "/tmp", + basic_params, selector_string, selectors_string, + vineyard::TypeToInt::value, + vineyard::AnyType::Double); + } else { + using ProjectedFragmentType = + gs::ArrowProjectedFragment; + std::shared_ptr projected_fragment = + ProjectedFragmentType::Project(fragment, 0, 0, 0, 0); + // test get data + using vertex_t = ProjectedFragmentType::vertex_t; + vertex_t vertex; + projected_fragment->GetInnerVertex(4, vertex); + VLOG(1) << "source vertex" << vertex.GetValue(); + QueryProjected(client, projected_fragment, comm_spec, app_name, "/tmp", + basic_params, selector_string, selectors_string, + vineyard::TypeToInt::value, + vineyard::AnyType::Int64); + } } } } diff --git a/coordinator/gscoordinator/template/CMakeLists.template b/coordinator/gscoordinator/template/CMakeLists.template index bd528e72f4be..6fce44e245e4 100644 --- a/coordinator/gscoordinator/template/CMakeLists.template +++ b/coordinator/gscoordinator/template/CMakeLists.template @@ -96,6 +96,7 @@ check_cxx_compiler_flag(-Wno-unknown-pragmas W_NO_UNKNOWN_PRAGMAS) check_cxx_compiler_flag(-Wno-stringop-overflow W_NO_STRINGOP_OVERFLOW) check_cxx_compiler_flag(-Wno-unused-variable W_NO_UNUSED_VARIABLE) check_cxx_compiler_flag(-Wno-redundant-move W_NO_REDUNDANT_MOVE) +check_cxx_compiler_flag(-mllvm=-lto-embed-bitcode SUPPORTS_LTO_EMBED_BITCODE) if(W_NO_CLASS_MEMACCESS) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-class-memaccess") @@ -374,7 +375,10 @@ elseif (JAVA_PIE_APP) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto") get_filename_component(COMPILER_DIR ${CMAKE_CXX_COMPILER} DIRECTORY) message(STATUS "Obtain directory: ${COMPILER_DIR}") - set(CMAKE_JNI_LINKER_FLAGS "-fuse-ld=${COMPILER_DIR}/ld.lld -Xlinker -mllvm=-lto-embed-bitcode") + set(CMAKE_JNI_LINKER_FLAGS "-fuse-ld=${COMPILER_DIR}/ld.lld -Xlinker") + if (SUPPORTS_LTO_EMBED_BITCODE) + set(CMAKE_JNI_LINKER_FLAGS "${CMAKE_JNI_LINKER_FLAGS} -mllvm=-lto-embed-bitcode") + endif() endif() else() message(WARNING "Compiling with ENABLE_JAVA_SDK ON expects a minimum Clang-11 compiler, " diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index 78bbf68833dc..e6c6b2b0a99d 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -252,7 +252,7 @@ def get_app_sha256(attr, java_class_path: str): app_sha256 = hashlib.sha256( f"{app_type}.{app_class}.{graph_type}".encode("utf-8", errors="ignore") ).hexdigest() - elif app_type == "java_pie": + elif app_type == "java_pie" or app_type == "giraph": s = hashlib.sha256() s.update(f"{graph_type}.{vd_type}".encode("utf-8", errors="ignore")) app_sha256 = s.hexdigest() @@ -317,6 +317,40 @@ def check_java_app_graph_consistency( return True +def check_giraph_app_graph_consistency( + app_class, cpp_graph_type, java_class_template_str +): + # Split the C++ graph type to get the parameters + split_cpp_graph_type = cpp_graph_type.split("<") + java_app_type_params = java_class_template_str[:-1].split("<")[-1].split(",") + + # Ensure the graph type is supported + if split_cpp_graph_type[0] != "gs::ArrowProjectedFragment": + raise RuntimeError("Giraph app only supports projected graph") + + # Extract actual type parameters from the graph + graph_actual_type_params = split_cpp_graph_type[1][:-1].split(",") + + # Define expected mapping between graph and app parameters + # (cpp index: java index) + index_mapping = {0: 0, 2: 1, 3: 2} # oid_t # vdata_t # edata_t + + # Check consistency between graph and app type parameters + for cpp_index, java_index in index_mapping.items(): + if not _type_param_consistent( + graph_actual_type_params[cpp_index], java_app_type_params[java_index] + ): + raise RuntimeError( + "Error in check app and graph consistency, type params index {}, cpp: {}, java: {}".format( + cpp_index, + graph_actual_type_params[cpp_index], + java_app_type_params[java_index], + ) + ) + + return True + + def run_command(args: str, cwd=None, **kwargs): logger.info("Running command: %s, cwd: %s", args, cwd) cp = subprocess.run(shlex.split(args), capture_output=True, cwd=cwd, **kwargs) @@ -487,6 +521,13 @@ def compile_app( graph_type, ) check_java_app_graph_consistency(app_class, graph_type, java_app_class) + if app_type == "giraph": + logger.info( + "Check consistent between giraph app %s and graph %s", + java_app_class, + graph_type, + ) + check_giraph_app_graph_consistency(app_class, graph_type, java_app_class) os.chdir(library_dir) @@ -512,7 +553,7 @@ def compile_app( if os.environ.get("GRAPHSCOPE_ANALYTICAL_DEBUG", "") == "1": cmake_commands.append("-DCMAKE_BUILD_TYPE=Debug") - if app_type == "java_pie": + if app_type == "java_pie" or app_type == "giraph": # for java need to run preprocess, and the generated files can be reused, # if the fragment & vd type is same. java_codegen_out_dir = os.path.join( @@ -695,6 +736,10 @@ def _type_param_consistent(graph_actucal_type_param, java_app_type_param): if graph_actucal_type_param in {"int32_t", "uint32_t"}: return True return False + if java_app_type_param == "com.alibaba.graphscope.ds.StringView": + if graph_actucal_type_param in {"std::string"}: + return True + return False return False @@ -1622,13 +1667,13 @@ def _codegen_app_info(attr, meta_file: str, java_class_path: str): algo = attr[types_pb2.APP_ALGO].s.decode("utf-8", errors="ignore") # for algo start with giraph:, we don't find info in meta if algo.startswith("giraph:") or algo.startswith("java_pie:"): - real_algo = algo.split(":")[1] + (app_type, real_algo) = algo.split(":") logger.info("codegen app info for java app: %s", real_algo) src_header, app_class, vd_type, java_app_template_str = _probe_for_java_app( attr, java_class_path, real_algo ) return ( - "java_pie", + app_type, src_header, "{}<_GRAPH_TYPE>".format(app_class), vd_type, diff --git a/python/graphscope/tests/conftest.py b/python/graphscope/tests/conftest.py index 45c563ad4678..4bb448919b74 100644 --- a/python/graphscope/tests/conftest.py +++ b/python/graphscope/tests/conftest.py @@ -391,6 +391,23 @@ def arrow_project_undirected_graph(arrow_property_graph_undirected): yield pg +@pytest.fixture(scope="module") +def p2p_property_graph_string_prop(graphscope_session): + g = graphscope_session.g(generate_eid=False, retain_oid=True, directed=True) + g = g.add_vertices( + f"{property_dir}/p2p-31_property_v_0", "person", properties=[("weight", "str")] + ) + g = g.add_edges( + f"{property_dir}/p2p-31_property_e_0", + label="knows", + src_label="person", + dst_label="person", + properties=[("dist", "str")], + ) + yield g + del g + + @pytest.fixture(scope="module") def p2p_property_graph(graphscope_session): g = graphscope_session.g(generate_eid=False, retain_oid=True, directed=True) @@ -638,6 +655,14 @@ def p2p_project_directed_graph(p2p_property_graph): yield pg +@pytest.fixture(scope="module") +def p2p_project_directed_graph_string_prop(p2p_property_graph_string_prop): + pg = p2p_property_graph_string_prop.project( + vertices={"person": ["weight"]}, edges={"knows": ["dist"]} + ) + yield pg + + @pytest.fixture(scope="module") def p2p_project_undirected_graph(p2p_property_graph_undirected): pg = p2p_property_graph_undirected.project( diff --git a/python/graphscope/tests/unittest/test_java_app.py b/python/graphscope/tests/unittest/test_java_app.py index 548cdd101f05..aa6352457868 100644 --- a/python/graphscope/tests/unittest/test_java_app.py +++ b/python/graphscope/tests/unittest/test_java_app.py @@ -59,6 +59,16 @@ def projected_graph_sssp_class(): return "com.alibaba.graphscope.example.sssp.SSSP" +@pytest.fixture(scope="module") +def projected_graph_stringApp_class(): + return "com.alibaba.graphscope.example.stringApp.StringApp" + + +@pytest.fixture(scope="module") +def projected_graph_circle_class(): + return "com.alibaba.graphscope.example.giraph.circle.Circle" + + @pytest.fixture(scope="module") def non_exist_java_class(): return "com.alibaba.graphscope.example.non.existing.java.class" @@ -173,5 +183,57 @@ def test_giraph_app( ) giraph_sssp = load_app(algo="giraph:com.alibaba.graphscope.example.giraph.SSSP") - giraph_sssp(g, sourceId=6) - del g + ctx = giraph_sssp(g, sourceId=6) + frame = ctx.to_dataframe({"id": "v.id", "r": "r"}) + + +@pytest.mark.skipif( + os.environ.get("RUN_JAVA_TESTS") != "ON", + reason="Java SDK is disabled, skip this test.", +) +@pytest.mark.timeout(3600) +def test_string_app( + demo_jar, + graphscope_session, + p2p_project_directed_graph_string_prop, + projected_graph_stringApp_class, +): + graphscope_session.add_lib(demo_jar) + string_app = load_app(algo="java_pie:{}".format(projected_graph_stringApp_class)) + ctx = string_app(p2p_project_directed_graph_string_prop._project_to_simple()) + + frame = ctx.to_dataframe({"id": "v.id", "r": "r"}) + frame + num_rows = frame.shape[0] + for i in range(min(num_rows, 10)): + # size should large than 0 + assert len(frame["r"][i]) > 0 + + +@pytest.mark.skipif( + os.environ.get("RUN_JAVA_TESTS") != "ON", + reason="Java SDK is disabled, skip this test.", +) +@pytest.mark.timeout(3600) +def test_giraph_circle_app( + demo_jar, + graphscope_session, + projected_graph_circle_class, +): + graphscope_session.add_lib(demo_jar) + vformat = ( + "giraph:com.alibaba.graphscope.example.giraph.circle.CircleVertexInputFormat" + ) + eformat = ( + "giraph:com.alibaba.graphscope.example.giraph.circle.CircleEdgeInputFormat" + ) + graph = projected_p2p_graph_loaded_by_giraph( + graphscope_session, demo_jar, vformat, eformat + ) + giraph_circle = load_app( + algo="giraph:com.alibaba.graphscope.example.giraph.circle.Circle" + ) + ctx = giraph_circle(graph) + frame = ctx.to_dataframe({"id": "v.id", "r": "r"}) + for i in range(10): + assert frame["r"][i] == "No Circle"