Skip to content

Commit

Permalink
Fixes the global property graph stream for subgraph. (#118)
Browse files Browse the repository at this point in the history
* Fixes the global property graph stream for subgraph.
* Persist before ConstructFragmentGroup.

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Feb 1, 2021
1 parent 30022c9 commit 2693c12
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 2 deletions.
1 change: 1 addition & 0 deletions analytical_engine/core/loader/arrow_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class ArrowFragmentLoader {

boost::leaf::result<vineyard::ObjectID> LoadFragmentAsFragmentGroup() {
BOOST_LEAF_AUTO(frag_id, LoadFragment());
VY_OK_OR_RAISE(client_.Persist(frag_id));
return vineyard::ConstructFragmentGroup(client_, frag_id, comm_spec_);
}

Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>
VINEYARD_CHECK_OK(client->GetMetaData(fragment_->id(), obj_meta));
vineyard::ObjectID new_frag_id;
VINEYARD_CHECK_OK(client->CreateMetaData(obj_meta, new_frag_id));
VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto new_frag =
Expand Down Expand Up @@ -206,6 +207,7 @@ class FragmentWrapper<vineyard::ArrowFragment<OID_T, VID_T>>

auto new_frag_id = fragment_->AddVertexColumns(*client, columns);

VINEYARD_CHECK_OK(client->Persist(new_frag_id));
BOOST_LEAF_AUTO(frag_group_id, vineyard::ConstructFragmentGroup(
*client, new_frag_id, comm_spec));
auto new_frag = client->GetObject<fragment_t>(new_frag_id);
Expand Down
2 changes: 2 additions & 0 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void LoadGraph(
auto new_frag = std::static_pointer_cast<_GRAPH_TYPE>(
client.GetObject(new_frag_id));

VINEYARD_CHECK_OK(client.Persist(new_frag_id));
BOOST_LEAF_AUTO(
new_frag_group_id,
vineyard::ConstructFragmentGroup(client, new_frag_id, comm_spec));
Expand Down Expand Up @@ -172,6 +173,7 @@ void ToArrowFragment(

gs::DynamicToArrowConverter<oid_t> converter(comm_spec, client);
BOOST_LEAF_AUTO(arrow_frag, converter.Convert(dynamic_frag));
VINEYARD_CHECK_OK(client.Persist(arrow_frag->id()));
BOOST_LEAF_AUTO(frag_group_id,
vineyard::ConstructFragmentGroup(
client, arrow_frag->id(), comm_spec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ std::shared_ptr<Object> GlobalPGStreamBuilder::_Seal(Client& client) {
auto gstream = std::make_shared<GlobalPGStream>();
gstream->total_stream_chunks_ = total_stream_chunks_;
gstream->meta_.SetTypeName(type_name<GlobalPGStream>());
gstream->meta_.SetGlobal(true);
gstream->meta_.AddKeyValue("total_stream_chunks", total_stream_chunks_);

for (size_t idx = 0; idx < stream_chunks_.size(); ++idx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class PropertyGraphInStream {

class GlobalPGStreamBuilder;

class GlobalPGStream : public Registered<GlobalPGStream> {
class GlobalPGStream : public Registered<GlobalPGStream>, GlobalObject {
public:
static std::shared_ptr<Object> Create() __attribute__((used)) {
return std::static_pointer_cast<Object>(
Expand Down
2 changes: 1 addition & 1 deletion k8s/gsvineyard.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN cd /tmp && \
make -j`nproc` && \
make install && \
cd /tmp && \
git clone -b v0.1.7 https://github.com/alibaba/libvineyard.git && \
git clone -b v0.1.8 https://github.com/alibaba/libvineyard.git && \
cd libvineyard && \
git submodule update --init && \
mkdir -p /tmp/libvineyard/build && \
Expand Down

0 comments on commit 2693c12

Please sign in to comment.