Skip to content

Support general project operator on graph. #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 185 additions & 11 deletions modules/graph/fragment/arrow_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ class ArrowFragment

fid_t fnum() const { return fnum_; }

label_id_t vertex_label_num() const { return vertex_label_num_; }

label_id_t vertex_label(const vertex_t& v) const {
return vid_parser_.GetLabelId(v.GetValue());
}
Expand All @@ -308,10 +306,13 @@ class ArrowFragment
return vid_parser_.GetOffset(v.GetValue());
}

label_id_t edge_label_num() const { return edge_label_num_; }
label_id_t vertex_label_num() const { return schema_.vertex_label_num(); }

label_id_t edge_label_num() const { return schema_.edge_label_num(); }

prop_id_t vertex_property_num(label_id_t label) const {
return static_cast<prop_id_t>(vertex_tables_[label]->num_columns());
std::string type = "VERTEX";
return static_cast<prop_id_t>(schema_.GetEntry(label, type).property_num());
}

std::shared_ptr<arrow::DataType> vertex_property_type(label_id_t label,
Expand All @@ -320,7 +321,8 @@ class ArrowFragment
}

prop_id_t edge_property_num(label_id_t label) const {
return static_cast<prop_id_t>(edge_tables_[label]->num_columns());
std::string type = "EDGE";
return static_cast<prop_id_t>(schema_.GetEntry(label, type).property_num());
}

std::shared_ptr<arrow::DataType> edge_property_type(label_id_t label,
Expand Down Expand Up @@ -758,6 +760,7 @@ class ArrowFragment
new_meta.AddKeyValue("edge_label_num", edge_label_num_);

size_t nbytes = 0;
auto schema = schema_;
for (int i = 0; i < extra_vertex_label_num; ++i) {
int cur_label_id = vertex_label_num_ + i;
std::string table_name =
Expand All @@ -780,7 +783,7 @@ class ArrowFragment
"vertex_property_name_" + std::to_string(i) + "_";
std::string type_prefix =
"vertex_property_type_" + std::to_string(i) + "_";
auto entry = schema_.CreateEntry(label, "VERTEX");
auto entry = schema.CreateEntry(label, "VERTEX");
for (prop_id_t j = 0; j < prop_num; ++j) {
new_meta.AddKeyValue(name_prefix + std::to_string(j),
table->field(j)->name());
Expand All @@ -789,7 +792,7 @@ class ArrowFragment
entry->AddProperty(table->field(j)->name(), table->field(j)->type());
}
}
new_meta.AddKeyValue("schema", schema_.ToJSONString());
new_meta.AddKeyValue("schema", schema.ToJSONString());

vineyard::ArrayBuilder<vid_t> ivnums_builder(client, ivnums);
vineyard::ArrayBuilder<vid_t> ovnums_builder(client, ovnums);
Expand Down Expand Up @@ -1179,6 +1182,7 @@ class ArrowFragment
// Increase edge label num
new_meta.AddKeyValue("edge_label_num", total_edge_label_num);

auto schema = schema_;
size_t nbytes = 0;
for (label_id_t i = 0; i < total_edge_label_num; ++i) {
if (i >= edge_label_num_) {
Expand All @@ -1197,7 +1201,7 @@ class ArrowFragment
"edge_property_name_" + std::to_string(i) + "_";
std::string type_prefix =
"edge_property_type_" + std::to_string(i) + "_";
auto entry = schema_.CreateEntry(kvs["label"], "EDGE");
auto entry = schema.CreateEntry(kvs["label"], "EDGE");
for (prop_id_t j = 0; j < table->num_columns(); ++j) {
new_meta.AddKeyValue(name_prefix + std::to_string(j),
table->field(j)->name());
Expand Down Expand Up @@ -1232,7 +1236,7 @@ class ArrowFragment
}
}
}
new_meta.AddKeyValue("schema", schema_.ToJSONString());
new_meta.AddKeyValue("schema", schema.ToJSONString());

for (label_id_t i = 0; i < vertex_label_num_; ++i) {
std::shared_ptr<arrow::Table> table = this->vertex_tables_[i];
Expand Down Expand Up @@ -1409,6 +1413,7 @@ class ArrowFragment
new_meta.AddKeyValue("vertex_label_num", vertex_label_num_);
new_meta.AddKeyValue("edge_label_num", edge_label_num_);

auto schema = schema_;
for (label_id_t i = 0; i < vertex_label_num_; ++i) {
std::string table_name = generate_name_with_suffix("vertex_tables", i);
if (columns.find(i) != columns.end()) {
Expand Down Expand Up @@ -1436,7 +1441,7 @@ class ArrowFragment
"vertex_property_name_" + std::to_string(i) + "_";
std::string type_prefix =
"vertex_property_type_" + std::to_string(i) + "_";
auto& entry = schema_.GetMutableEntry(label, "VERTEX");
auto& entry = schema.GetMutableEntry(label, "VERTEX");
for (prop_id_t j = 0; j < prop_num; ++j) {
new_meta.AddKeyValue(name_prefix + std::to_string(j),
arrow_table->field(j)->name());
Expand Down Expand Up @@ -1470,7 +1475,7 @@ class ArrowFragment
}
}
}
new_meta.AddKeyValue("schema", schema_.ToJSONString());
new_meta.AddKeyValue("schema", schema.ToJSONString());

size_t nbytes = 0;
new_meta.AddMember("ivnums", old_meta.GetMemberMeta("ivnums"));
Expand Down Expand Up @@ -1522,6 +1527,175 @@ class ArrowFragment
return ret;
}

boost::leaf::result<vineyard::ObjectID> Project(
vineyard::Client& client,
std::map<label_id_t, std::vector<label_id_t>> vertices,
std::map<label_id_t, std::vector<label_id_t>> edges) {
vineyard::ObjectMeta old_meta, new_meta;
VINEYARD_CHECK_OK(client.GetMetaData(this->id_, old_meta));

new_meta.SetTypeName(type_name<ArrowFragment<oid_t, vid_t>>());
new_meta.AddKeyValue("fid", fid_);
new_meta.AddKeyValue("fnum", fnum_);
new_meta.AddKeyValue("directed", static_cast<int>(directed_));
new_meta.AddKeyValue("oid_type", TypeName<oid_t>::Get());
new_meta.AddKeyValue("vid_type", TypeName<vid_t>::Get());
new_meta.AddKeyValue("vertex_label_num", vertex_label_num_);
new_meta.AddKeyValue("edge_label_num", edge_label_num_);

auto schema = schema_;

std::vector<label_id_t> vertex_labels, edge_labels;
std::vector<std::vector<prop_id_t>> vertex_properties, edge_properties;

for (auto& pair : vertices) {
vertex_labels.push_back(pair.first);
vertex_properties.push_back(pair.second);
}
for (auto& pair : edges) {
edge_labels.push_back(pair.first);
edge_properties.push_back(pair.second);
}
std::string s1, s2, s3, s4;
for (auto x : vertex_labels) {
s1 = s1 + std::to_string(x) + ", ";
}
for (auto x : vertex_properties) {
std::string s = "[";
for (auto xx : x) {
s = s + std::to_string(xx) + ",";
}
s += "]";
s2 = s2 + s + ", ";
}
for (auto x : edge_labels) {
s3 = s3 + std::to_string(x) + ", ";
}
for (auto x : edge_properties) {
std::string s = "[";
for (auto xx : x) {
s = s + std::to_string(xx) + ",";
}
s += "]";
s4 = s4 + s + ", ";
}
LOG(INFO) << "Vertex labels " << s1;
LOG(INFO) << "Vertex properties " << s2;
LOG(INFO) << "Edge labels " << s3;
LOG(INFO) << "Edge properties " << s4;

// Compute the set difference of reserved labels and all labels.
auto invalidate_label = [&schema](const std::vector<label_id_t>& labels,
std::string type, size_t label_num) {
auto it = labels.begin();
for (size_t i = 0; i < label_num; ++i) {
if (it == labels.end() || i < *it) {
if (type == "VERTEX") {
schema.InvalidateVertex(i);
} else {
schema.InvalidateEdge(i);
}
} else {
++it;
}
}
};

auto invalidate_prop =
[&schema](const std::vector<label_id_t>& labels, std::string type,
const std::vector<std::vector<prop_id_t>>& props) {
for (size_t i = 0; i < labels.size(); ++i) {
auto& entry = schema.GetMutableEntry(labels[i], type);
auto it1 = props[i].begin();
auto it2 = props[i].end();
size_t prop_num = entry.props_.size();
for (size_t j = 0; j < prop_num; ++j) {
if (it1 == it2 || j < *it1) {
entry.InvalidateProperty(j);
} else {
++it1;
}
}
}
};
invalidate_prop(vertex_labels, "VERTEX", vertex_properties);
invalidate_prop(edge_labels, "EDGE", edge_properties);
invalidate_label(vertex_labels, "VERTEX", schema.vertex_entries().size());
invalidate_label(edge_labels, "EDGE", schema.edge_entries().size());

new_meta.AddKeyValue("schema", schema.ToJSONString());
LOG(INFO) << "Schema: " << schema.ToJSONString();

size_t nbytes = 0;
new_meta.AddMember("ivnums", old_meta.GetMemberMeta("ivnums"));
nbytes += old_meta.GetMemberMeta("ivnums").GetNBytes();
new_meta.AddMember("ovnums", old_meta.GetMemberMeta("ovnums"));
nbytes += old_meta.GetMemberMeta("ovnums").GetNBytes();
new_meta.AddMember("tvnums", old_meta.GetMemberMeta("tvnums"));
nbytes += old_meta.GetMemberMeta("tvnums").GetNBytes();

ASSIGN_IDENTICAL_VEC_META("ovgid_lists", vertex_label_num_);
ASSIGN_IDENTICAL_VEC_META("ovg2l_maps", vertex_label_num_);
ASSIGN_IDENTICAL_VEC_META("vertex_tables", vertex_label_num_);
ASSIGN_IDENTICAL_VEC_META("edge_tables", edge_label_num_);

for (label_id_t i = 0; i < vertex_label_num_; ++i) {
std::shared_ptr<arrow::Table> table = this->vertex_tables_[i];
prop_id_t prop_num = table->num_columns();
new_meta.AddKeyValue(
"vertex_label_name_" + std::to_string(i),
old_meta.GetKeyValue("vertex_label_name_" + std::to_string(i)));
new_meta.AddKeyValue("vertex_property_num_" + std::to_string(i),
std::to_string(prop_num));
std::string name_prefix =
"vertex_property_name_" + std::to_string(i) + "_";
std::string type_prefix =
"vertex_property_type_" + std::to_string(i) + "_";
for (prop_id_t j = 0; j < prop_num; ++j) {
new_meta.AddKeyValue(name_prefix + std::to_string(j),
table->field(j)->name());
new_meta.AddKeyValue(type_prefix + std::to_string(j),
arrow_type_to_string(table->field(j)->type()));
}
}

for (label_id_t i = 0; i < edge_label_num_; ++i) {
std::shared_ptr<arrow::Table> table = this->edge_tables_[i];
prop_id_t prop_num = table->num_columns();
new_meta.AddKeyValue(
"edge_label_name_" + std::to_string(i),
old_meta.GetKeyValue("edge_label_name_" + std::to_string(i)));
new_meta.AddKeyValue("edge_property_num_" + std::to_string(i),
std::to_string(prop_num));
std::string name_prefix = "edge_property_name_" + std::to_string(i) + "_";
std::string type_prefix = "edge_property_type_" + std::to_string(i) + "_";
for (prop_id_t j = 0; j < prop_num; ++j) {
new_meta.AddKeyValue(name_prefix + std::to_string(j),
table->field(j)->name());
new_meta.AddKeyValue(type_prefix + std::to_string(j),
arrow_type_to_string(table->field(j)->type()));
}
}

if (directed_) {
ASSIGN_IDENTICAL_VEC_VEC_META("ie_lists", vertex_label_num_,
edge_label_num_);
ASSIGN_IDENTICAL_VEC_VEC_META("ie_offsets_lists", vertex_label_num_,
edge_label_num_);
}
ASSIGN_IDENTICAL_VEC_VEC_META("oe_lists", vertex_label_num_,
edge_label_num_);
ASSIGN_IDENTICAL_VEC_VEC_META("oe_offsets_lists", vertex_label_num_,
edge_label_num_);

new_meta.AddMember("vertex_map", old_meta.GetMemberMeta("vertex_map"));

new_meta.SetNBytes(nbytes);

vineyard::ObjectID ret;
VINEYARD_CHECK_OK(client.CreateMetaData(new_meta, ret));
return ret;
}
#undef ASSIGN_IDENTICAL_VEC_META
#undef ASSIGN_IDENTICAL_VEC_VEC_META
#undef GENERATE_VEC_META
Expand Down
Loading