Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Support property filter pushdown by utilizing payload file formats #178

Merged
merged 30 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a7b09f1
Feat: C++ SDK pushdown prototype
Ziy1-Tan Jun 14, 2023
2760b3e
Refactor: helper function support pushdown
Ziy1-Tan Jun 16, 2023
f0c68b4
Feat: column projection support
Ziy1-Tan Jun 16, 2023
68b1133
Refactor: wrap filter and columns into FilterOptions
Ziy1-Tan Jun 17, 2023
47129e8
Feat: Adj list filter pushdown support
Ziy1-Tan Jun 19, 2023
22587ba
Refactor: improved the usability of filter pushdown API
Ziy1-Tan Jun 22, 2023
3bd1931
Refactor: wrapper for arrow::compute::Expression
Ziy1-Tan Jun 23, 2023
5036b2a
Refactor: make friend class
Ziy1-Tan Jun 23, 2023
bdb8483
Refactor: Fix style
Ziy1-Tan Jun 24, 2023
11e00f4
Refactor: Fix style
Ziy1-Tan Jun 24, 2023
b1d536a
Refactor: more friendly Expression API
Ziy1-Tan Jun 24, 2023
b9cb893
Refactor: switch case to template call
Ziy1-Tan Jun 25, 2023
ee09d89
Docs: filter Expression
Ziy1-Tan Jun 26, 2023
c3fdd5e
Bugfix: type casting when table is empty
Ziy1-Tan Jun 26, 2023
52b2092
Refactor: clean header files
Ziy1-Tan Jun 27, 2023
70ae4fc
Refactor: Fix style
Ziy1-Tan Jun 30, 2023
d24d269
Refactor: ut for reader
Ziy1-Tan Jul 4, 2023
94aae02
Refactor: fix the GetRange()
Ziy1-Tan Jul 4, 2023
0375538
Refactor: improve API docs
Ziy1-Tan Jul 5, 2023
8e59514
Refactor: improve expression usability
Ziy1-Tan Jul 5, 2023
2246136
Refactor: fix style
Ziy1-Tan Jul 6, 2023
0d03fe7
Refactor: fix style
Ziy1-Tan Jul 6, 2023
8e27aff
Docs: Expression
Ziy1-Tan Jul 12, 2023
bf47cad
Refactor: fix style
Ziy1-Tan Jul 12, 2023
8a83246
Merge branch 'main' into pushdown
acezen Jul 18, 2023
a10b456
Merge branch 'main' into pushdown
acezen Jul 21, 2023
eab9013
Merge branch 'main' into pushdown
acezen Jul 21, 2023
38282a1
Refactor: filter expression validation & ut
Ziy1-Tan Jul 23, 2023
08dec6f
Merge branch 'pushdown' of github.com:Ziy1-Tan/GraphAr into pushdown
Ziy1-Tan Jul 23, 2023
8524fbf
Refactor: fix style
Ziy1-Tan Jul 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ macro(build_gar)
if(APPLE)
target_link_libraries(gar PRIVATE -Wl,-force_load gar_arrow_static
"${GAR_PARQUET_STATIC_LIB}"
"${GAR_DATASET_STATIC_LIB}"
"${GAR_ACERO_STATIC_LIB}"
"${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}")
else()
target_link_libraries(gar PRIVATE -Wl,--exclude-libs,ALL -Wl,--whole-archive gar_arrow_static
"${GAR_PARQUET_STATIC_LIB}"
"${GAR_DATASET_STATIC_LIB}"
"${GAR_ARROW_ACERO_STATIC_LIB}"
"${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}" -Wl,--no-whole-archive)
endif()
Expand Down
10 changes: 9 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ function(build_arrow)
set(GAR_PARQUET_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}parquet${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_PARQUET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_PARQUET_STATIC_LIB_FILENAME}" CACHE INTERNAL "parquet lib")
set(GAR_DATASET_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_dataset${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_DATASET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_DATASET_STATIC_LIB_FILENAME}" CACHE INTERNAL "arrow dataset lib")
set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_bundled_dependencies${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB
Expand Down Expand Up @@ -83,7 +86,7 @@ function(build_arrow)
"-DARROW_S3=ON")

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}" "${GAR_DATASET_STATIC_LIB}")

find_package(Threads)
find_package(Arrow QUIET)
Expand All @@ -104,16 +107,21 @@ function(build_arrow)

set(GAR_ARROW_LIBRARY_TARGET gar_arrow_static)
set(GAR_PARQUET_LIBRARY_TARGET gar_parquet_static)
set(GAR_DATASET_LIBRARY_TARGET gar_dataset_static)

file(MAKE_DIRECTORY "${GAR_ARROW_INCLUDE_DIR}")
add_library(${GAR_ARROW_LIBRARY_TARGET} STATIC IMPORTED)
add_library(${GAR_PARQUET_LIBRARY_TARGET} STATIC IMPORTED)
add_library(${GAR_DATASET_LIBRARY_TARGET} STATIC IMPORTED)
set_target_properties(${GAR_ARROW_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_ARROW_STATIC_LIB})
set_target_properties(${GAR_PARQUET_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_PARQUET_STATIC_LIB})
set_target_properties(${GAR_DATASET_LIBRARY_TARGET}
PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR}
IMPORTED_LOCATION ${GAR_DATASET_STATIC_LIB})
if (ARROW_VERSION_TO_BUILD GREATER_EQUAL "12.0.0")
set(GAR_ARROW_ACERO_STATIC_LIB_FILENAME
"${CMAKE_STATIC_LIBRARY_PREFIX}arrow_acero${CMAKE_STATIC_LIBRARY_SUFFIX}")
Expand Down
12 changes: 12 additions & 0 deletions cpp/include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <vector>

#include "utils/adj_list_type.h"
Expand All @@ -43,6 +44,11 @@ struct Property {
std::string name; // property name
DataType type; // property data type
bool is_primary; // primary key tag

Property() {}
explicit Property(const std::string& name) : name(name) {}
Property(const std::string& name, const DataType& type, bool is_primary)
: name(name), type(type), is_primary(is_primary) {}
};

static bool operator==(const Property& lhs, const Property& rhs) {
Expand Down Expand Up @@ -87,6 +93,7 @@ class PropertyGroup {
std::vector<std::string> names;
for (auto& property : properties_) {
names.push_back(property.name);
property_names_.insert(property.name);
}
prefix_ = util::ConcatStringWithDelimiter(names, REGULAR_SEPERATOR) + "/";
}
Expand Down Expand Up @@ -121,6 +128,10 @@ class PropertyGroup {
return properties_;
}

inline bool ContainProperty(const std::string& property_name) const {
return property_names_.find(property_name) != property_names_.end();
}

/** Get the file type of property group chunk file.
*
* @return The file type of group.
Expand All @@ -146,6 +157,7 @@ class PropertyGroup {

private:
std::vector<Property> properties_;
std::unordered_set<std::string> property_names_;
FileType file_type_;
std::string prefix_;
};
Expand Down
79 changes: 62 additions & 17 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ limitations under the License.
#include "gar/graph_info.h"
#include "gar/utils/data_type.h"
#include "gar/utils/filesystem.h"
#include "gar/utils/reader_utils.h"
#include "gar/utils/result.h"
#include "gar/utils/status.h"
#include "gar/utils/utils.h"
Expand Down Expand Up @@ -52,18 +51,22 @@ class VertexPropertyArrowChunkReader {
VertexPropertyArrowChunkReader(const VertexInfo& vertex_info,
const PropertyGroup& property_group,
const std::string& prefix,
IdType chunk_index = 0)
IdType chunk_index = 0,
const utils::FilterOptions& options = {})
: vertex_info_(vertex_info),
property_group_(property_group),
chunk_index_(chunk_index),
seek_id_(chunk_index * vertex_info.GetChunkSize()),
chunk_table_(nullptr) {
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
vertex_info.GetPathPrefix(property_group));
std::string base_dir = prefix_ + pg_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_,
utils::GetVertexChunkNum(prefix_, vertex_info));
GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
utils::GetVertexNum(prefix_, vertex_info_));
}

/**
Expand Down Expand Up @@ -126,14 +129,32 @@ class VertexPropertyArrowChunkReader {
*/
IdType GetChunkNum() const noexcept { return chunk_num_; }

/**
* @brief Apply the row filter to the table. No parameter call Filter() will
* clear the filter.
*
* @param filter Predicate expression to filter rows.
*/
void Filter(utils::Filter filter = nullptr);

/**
* @brief Apply the projection to the table to be read. No parameter call
* Select() will clear the projection.
*
* @param column_names The name of columns to be selected.
*/
void Select(utils::ColumnNames column_names = std::nullopt);

private:
VertexInfo vertex_info_;
PropertyGroup property_group_;
std::string prefix_;
IdType chunk_index_;
IdType seek_id_;
IdType chunk_num_;
IdType vertex_num_;
std::shared_ptr<arrow::Table> chunk_table_;
utils::FilterOptions filter_options_;
std::shared_ptr<FileSystem> fs_;
};

Expand Down Expand Up @@ -227,7 +248,8 @@ class AdjListArrowChunkReader {
}

/**
* @brief Return the current chunk of chunk position indicator as arrow::Table
* @brief Return the current chunk of chunk position indicator as
* arrow::Table
*/
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;

Expand Down Expand Up @@ -420,7 +442,8 @@ class AdjListPropertyArrowChunkReader {
* @brief Initialize the AdjListPropertyArrowChunkReader.
*
* @param edge_info The edge info that describes the edge type.
* @param property_group The property group that describes the property group.
* @param property_group The property group that describes the property
* group.
* @param adj_list_type The adj list type for the edges.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
Expand All @@ -429,15 +452,17 @@ class AdjListPropertyArrowChunkReader {
const PropertyGroup& property_group,
AdjListType adj_list_type,
const std::string prefix,
IdType vertex_chunk_index = 0)
IdType vertex_chunk_index = 0,
const utils::FilterOptions& options = {})
: edge_info_(edge_info),
property_group_(property_group),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr) {
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
Expand All @@ -463,6 +488,7 @@ class AdjListPropertyArrowChunkReader {
chunk_index_(other.chunk_index_),
seek_offset_(other.seek_offset_),
chunk_table_(nullptr),
filter_options_(other.filter_options_),
vertex_chunk_num_(other.vertex_chunk_num_),
chunk_num_(other.chunk_num_),
base_dir_(other.base_dir_),
Expand Down Expand Up @@ -506,7 +532,8 @@ class AdjListPropertyArrowChunkReader {
}

/**
* @brief Return the current chunk of chunk position indicator as arrow::Table
* @brief Return the current chunk of chunk position indicator as
* arrow::Table
*/
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;

Expand Down Expand Up @@ -564,6 +591,22 @@ class AdjListPropertyArrowChunkReader {
return Status::OK();
}

/**
* @brief Apply the row filter to the table. No parameter call Filter() will
* clear the filter.
*
* @param filter Predicate expression to filter rows.
*/
void Filter(utils::Filter filter = nullptr);

/**
* @brief Apply the projection to the table to be read. No parameter call
* Select() will clear the projection.
*
* @param column_names The name of columns to be selected.
*/
void Select(utils::ColumnNames column_names = std::nullopt);

private:
EdgeInfo edge_info_;
PropertyGroup property_group_;
Expand All @@ -572,6 +615,7 @@ class AdjListPropertyArrowChunkReader {
IdType vertex_chunk_index_, chunk_index_;
IdType seek_offset_;
std::shared_ptr<arrow::Table> chunk_table_;
utils::FilterOptions filter_options_;
IdType vertex_chunk_num_, chunk_num_;
std::string base_dir_;
std::shared_ptr<FileSystem> fs_;
Expand All @@ -587,15 +631,16 @@ class AdjListPropertyArrowChunkReader {
static inline Result<VertexPropertyArrowChunkReader>
ConstructVertexPropertyArrowChunkReader(
const GraphInfo& graph_info, const std::string& label,
const PropertyGroup& property_group) noexcept {
const PropertyGroup& property_group,
const utils::FilterOptions& options = {}) noexcept {
VertexInfo vertex_info;
GAR_ASSIGN_OR_RAISE(vertex_info, graph_info.GetVertexInfo(label));
if (!vertex_info.ContainPropertyGroup(property_group)) {
return Status::KeyError("No property group ", property_group, " in vertex ",
label, ".");
}
return VertexPropertyArrowChunkReader(vertex_info, property_group,
graph_info.GetPrefix());
graph_info.GetPrefix(), 0, options);
}

/**
Expand Down Expand Up @@ -663,12 +708,11 @@ ConstructAdjListOffsetArrowChunkReader(const GraphInfo& graph_info,
* @param adj_list_type The adj list type for the edges.
*/
static inline Result<AdjListPropertyArrowChunkReader>
ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info,
const std::string& src_label,
const std::string& edge_label,
const std::string& dst_label,
const PropertyGroup& property_group,
AdjListType adj_list_type) noexcept {
ConstructAdjListPropertyArrowChunkReader(
const GraphInfo& graph_info, const std::string& src_label,
const std::string& edge_label, const std::string& dst_label,
const PropertyGroup& property_group, AdjListType adj_list_type,
const utils::FilterOptions& options = {}) noexcept {
EdgeInfo edge_info;
GAR_ASSIGN_OR_RAISE(edge_info,
graph_info.GetEdgeInfo(src_label, edge_label, dst_label));
Expand All @@ -683,7 +727,8 @@ ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info,
AdjListTypeToString(adj_list_type), ".");
}
return AdjListPropertyArrowChunkReader(edge_info, property_group,
adj_list_type, graph_info.GetPrefix());
adj_list_type, graph_info.GetPrefix(),
0, options);
}

} // namespace GAR_NAMESPACE_INTERNAL
Expand Down
Loading