Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Fix] fix some bugs found in cypher e2e test #2579

Merged
merged 6 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"name": "person"
},
"dst": {
"id": 0,
"id": 1,
"name": "software"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"name": "person"
},
"dst": {
"id": 0,
"id": 1,
"name": "software"
}
}
Expand Down
32 changes: 17 additions & 15 deletions interactive_engine/executor/ir/core/src/glogue/pattern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,18 +647,31 @@ fn generate_source_operator(
pattern: &Pattern, source_extend: &ExactExtendStep,
) -> IrPatternResult<pb::logical_plan::Operator> {
let source_vertex_id = source_extend.get_target_vertex_id();
let source_vertex_table = generate_vertex_table(pattern, source_vertex_id)?;
let inferred_vertex_table = generate_vertex_table(pattern, source_vertex_id)?;
// Fuse `source_vertex_label` into source, for efficiently scan
let source_vertex_param = if let Some(mut vertex_param) = pattern
.get_vertex_parameters(source_vertex_id)?
.cloned()
{
if source_vertex_table.len() > 0 {
vertex_param.tables = source_vertex_table;
// if user given vertex table is empty, using the inferred vertex table
// and vise versa
if vertex_param.tables.len() == 0 {
vertex_param.tables = inferred_vertex_table;
}
// if the two tables are both not empty
// intersect user given labels and inferred labels
else if inferred_vertex_table.len() > 0 {
let user_given_vertex_table = vertex_param.tables;
vertex_param.tables = vec![];
for inferred_vertex_label in inferred_vertex_table {
if user_given_vertex_table.contains(&inferred_vertex_label) {
vertex_param.tables.push(inferred_vertex_label)
}
}
}
vertex_param
} else {
query_params(source_vertex_table, vec![], None)
query_params(inferred_vertex_table, vec![], None)
};
let source_scan = pb::Scan {
scan_opt: 0,
Expand Down Expand Up @@ -876,17 +889,6 @@ fn get_edge_expand_from_binder<'a, 'b>(
) -> IrPatternResult<&'a pb::EdgeExpand> {
use pb::pattern::binder::Item as BinderItem;
if let Some(BinderItem::Path(path_expand)) = binder.item.as_ref() {
let hop_range = path_expand
.hop_range
.as_ref()
.ok_or(ParsePbError::EmptyFieldError("pb::PathExpand::hop_range".to_string()))?;
if hop_range.lower < 1 {
// The path with range from 0 cannot be translated to oprs that can be intersected.
return Err(IrPatternError::Unsupported(format!(
"PathExpand in Pattern with lower range of {:?}",
hop_range.lower
)))?;
}
let expand_base = path_expand
.base
.as_ref()
Expand Down
38 changes: 32 additions & 6 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,9 +805,17 @@ fn add_intersect_job_builder(
if get_v.alias.is_none() || !get_v.alias.as_ref().unwrap().eq(intersect_tag) {
Err(IrError::InvalidPattern("Cannot intersect on different tags".to_string()))?
}
// If the first operator is PathExpand, pick its last expand out from the path
let mut edge_expand = if let Some(Edge(edge_expand)) = first_opr.borrow().opr.opr.as_ref() {
Ok(edge_expand.clone())
edge_expand.clone()
} else if let Some(Path(path_expand)) = first_opr.borrow().opr.opr.as_ref() {
// Process path_expand as follows:
// 1. If path_expand range from 0, it is unsupported;
// 2. If it is path_expand(1,2), optimized as edge_expand;
// 3. Otherwise, translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand,
// and the last edge_expand is the one to intersect.
// Notice that if we have predicates for vertices in path_expand, or for the last vertex of path_expand,
// do the filtering after intersection.
let mut path_expand = path_expand.clone();
let path_expand_base = path_expand
.base
Expand All @@ -828,37 +836,52 @@ fn add_intersect_job_builder(
Err(IrError::Unsupported(
"Edge Only PathExpand in Intersection's subplan has not been supported yet"
.to_string(),
))?;
))?
}
// Combine the params for the last vertex in path.
// That is, it should satisfy both params in `GetV` in PathExpand's ExpandBase,
// and the params in `EndV` following PathExpand.
if let Some(path_get_v) = path_get_v_opt {
get_v = combine_get_v_by_query_params(get_v, path_get_v);
}
// pick the last edge expand out from the path expand
let mut last_edge_expand = base_edge_expand.clone();
last_edge_expand.v_tag = None;
let hop_range = path_expand
.hop_range
.as_mut()
.ok_or(ParsePbError::EmptyFieldError("pb::PathExpand::hop_range".to_string()))?;
if hop_range.lower < 1 {
Err(IrError::Unsupported(format!(
"PathExpand in Intersection with lower range of {:?}",
hop_range.lower
)))?
}
if hop_range.lower == 1 && hop_range.upper == 2 {
// optimized Path(1..2) to as EdgeExpand
last_edge_expand.v_tag = path_expand.start_tag;
} else {
// translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand,
hop_range.lower -= 1;
hop_range.upper -= 1;
let mut end_v = pb::GetV::default();
end_v.opt = pb::get_v::VOpt::End as i32;
// build the path expansion
path_expand.add_job_builder(builder, plan_meta)?;
end_v.add_job_builder(builder, plan_meta)?;
}
Ok(last_edge_expand)
last_edge_expand
} else {
Err(IrError::InvalidPattern(
"First node of Intersection's subplan is neither EdgeExpand or PathExpand".to_string(),
))
}?;
))?
};
// build the edge expansion
// the opt should be vertex because now only intersection on vertex is supported
edge_expand.expand_opt = pb::edge_expand::ExpandOpt::Vertex as i32;
edge_expand.alias = get_v.alias.clone();
edge_expand.add_job_builder(&mut sub_bldr, plan_meta)?;

// vertex parameter after the intersection
if let Some(params) = get_v.params.as_ref() {
// the case that we need to further process getV's filter.
if params.is_queryable() || !params.tables.is_empty() {
Expand All @@ -870,13 +893,16 @@ fn add_intersect_job_builder(
let sub_plan = sub_bldr.take_plan();
intersect_plans.push(sub_plan);
}
// intersect
builder.intersect(intersect_plans, intersect_tag.clone());
// unfold the intersection
let unfold = pb::Unfold {
tag: Some(intersect_tag.clone()),
alias: Some(intersect_tag.clone()),
meta_data: None,
};
unfold.add_job_builder(builder, plan_meta)?;
// add vertex filters
if let Some(mut auxilia) = auxilia {
auxilia.tag = Some(intersect_tag.clone());
builder.get_v(auxilia);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ impl FilterMapFunction<Record, Record> for GetVertexOperator {
input.append(vertex, self.alias.clone());
Ok(Some(input))
} else if let Some(graph_path) = entry.as_graph_path() {
if let VOpt::End = self.opt {
let path_end = graph_path.get_path_end().clone();
input.append(path_end, self.alias.clone());
Ok(Some(input))
} else {
Err(FnExecError::unsupported_error(
"Only support `GetV` with VOpt::End on a path entry",
))?
}
// TODO: we do not check VOpt here, and we treat all cases as to get the end vertex of the path.
let path_end = graph_path.get_path_end().clone();
input.append(path_end, self.alias.clone());
Ok(Some(input))
} else {
Err(FnExecError::unexpected_data_error(
"Can only apply `GetV` (`Auxilia` instead) on an edge or path entry",
Expand Down Expand Up @@ -146,6 +141,17 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
// e.g., for g.V().out().as("a").has("name", "marko"), we should compile as:
// g.V().out().auxilia(as("a"))... where we give alias in auxilia,
// then we set tag=None and alias="a" in auxilia
// 1. filter by labels.
if !self.query_params.labels.is_empty() && entry.label().is_some() {
if !self
.query_params
.labels
.contains(&entry.label().unwrap())
{
return Ok(None);
}
}
// 2. further fetch properties, e.g., filter by columns.
match entry.get_type() {
EntryType::Vertex => {
let graph = get_graph().ok_or(FnExecError::NullGraphError)?;
Expand Down