-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Support property filter pushdown by utilizing payload file formats #221
Conversation
ab0cd84
to
ba8dbdb
Compare
cc @lixueclaire @acezen. Currently, reading a single property group is easy to push down: val property_group = vertex_info.getPropertyGroup("gender")
// test reading a single property chunk
val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
assert(single_chunk_df.columns.length == 3)
assert(single_chunk_df.count() == 100)
val cond = "gender = 'female'"
var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)
df_pd.explain()
df_pd.show() == Physical Plan ==
*(1) Filter (isnotnull(gender#2) AND (gender#2 = female))
+- *(1) ColumnarToRow
+- BatchScan[firstName#0, gender#2] GarScan DataFilters: [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: InMemoryFileIndex(1 paths)[file:/home/simple/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., PartitionFilters: [], PushedFilters: [IsNotNull(gender), EqualTo(gender,female)], ReadSchema: struct<firstName:string,gender:string>, PushedFilters: [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
2
+------------+------+
| firstName|gender|
+------------+------+
| Eli|female|
| Joseph|female|
| Jose|female|
| Jun|female|
| A. C.|female|
| Karim|female|
| Poul|female|
| Chipo|female|
| Dovid|female|
| Ashin|female|
| Cam|female|
| Kurt|female|
|Daouda Malam|female|
| David|female|
| Batong|female|
| Zheng|female|
| Gabriel|female|
| Boris|female|
| Jose|female|
| Fernando|female|
+------------+------+ But it is difficult to push down for reading multiple property groups: val vertex_df_with_index = reader.readAllVertexPropertyGroups()
assert(vertex_df_with_index.columns.length == 5)
assert(vertex_df_with_index.count() == 903)
df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
df_pd.explain()
df_pd.show() == Physical Plan ==
*(1) Project [firstName#196, gender#198]
+- *(1) Filter (isnotnull(gender#198) AND (gender#198 = female))
+- *(1) Scan ExistingRDD[_graphArVertexIndex#194L,id#195L,firstName#196,lastName#197,gender#198]
2
+------------+------+
| firstName|gender|
+------------+------+
| Eli|female|
| Joseph|female|
| Jose|female|
| Jun|female|
| A. C.|female|
| Karim|female|
| Poul|female|
| Chipo|female|
| Dovid|female|
| Ashin|female|
| Cam|female|
| Kurt|female|
|Daouda Malam|female|
| David|female|
| Batong|female|
| Zheng|female|
| Gabriel|female|
| Boris|female|
| Jose|female|
| Fernando|female|
+------------+------+ Because different property groups are actually stored in different parquet. |
Hi, @Ziy1-Tan, thanks for your proposal. Currently, it is OK for me. We do not intend to propose a method to support filter pushdown between different parquet files. In GraphAr's design, properties are encouraged to be in the same group if they are accessed together, thus pushdown is supported as well. |
Got it. I'm going to test the performance improvement based on spark filter pushdown.
|
|
We don't have large scale ldbc dataset yet. I can generate a copy of ldbc-sf30, ldbc-100 to OSS for performance test. |
Got it. Can't wait to test the performance improvement. |
hi, @Ziy1-Tan, the large scale ldbc dataset has been publish to here, you can download it to test the performance. |
hi, @Ziy1-Tan ,code format for scala and java has been merge into main, you can rebase and apply the format with |
assert(property_df.columns.size == 3) | ||
val cond = "gender = 'female'" | ||
var df_pd = single_chunk_df.select("firstName","gender").filter(cond) | ||
df_pd.explain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please include the resulting physical plan in the comments? This would effectively demonstrate the filter pushdown in a more intuitive manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will apply plan on it.
b75b081
to
44f2e80
Compare
Signed-off-by: Ziy1-Tan <[email protected]>
Signed-off-by: Ziy1-Tan <[email protected]>
Signed-off-by: Ziy1-Tan <[email protected]>
44f2e80
to
d9e4e6e
Compare
Signed-off-by: Ziy1-Tan <[email protected]>
Signed-off-by: Ziy1-Tan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~ This is highly appreciated and valuable, thank you for your contribution!
Signed-off-by: Ziy1-Tan <[email protected]>
4ffa007
to
b277a38
Compare
This PR is about C++ SDK for OSPP 2023
Issue number: #98.
You can find more detail about this feature here
Proposed changes
Now we support filter pushdown for spark
Types of changes
What types of changes does your code introduce to GraphAr?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.