From 496856839ad170de40b42158cb906474374c7d3d Mon Sep 17 00:00:00 2001 From: Ziyi Tan Date: Mon, 24 Jul 2023 11:02:11 +0800 Subject: [PATCH] [C++] Support property filter pushdown by utilizing payload file formats (#178) --------- Signed-off-by: Ziy1-Tan Co-authored-by: Weibin Zeng --- cpp/CMakeLists.txt | 2 + cpp/cmake/apache-arrow.cmake | 10 +- cpp/include/gar/graph_info.h | 12 + cpp/include/gar/reader/arrow_chunk_reader.h | 79 ++++- cpp/include/gar/utils/expression.h | 356 ++++++++++++++++++++ cpp/include/gar/utils/filesystem.h | 15 +- cpp/include/gar/utils/reader_utils.h | 21 ++ cpp/src/arrow_chunk_reader.cc | 42 ++- cpp/src/expression.cc | 83 +++++ cpp/src/filesystem.cc | 79 +++-- cpp/src/reader_utils.cc | 34 ++ cpp/test/test_arrow_chunk_reader.cc | 205 ++++++++++- docs/reference/api-reference-cpp.rst | 70 ++++ 13 files changed, 931 insertions(+), 77 deletions(-) create mode 100644 cpp/include/gar/utils/expression.h create mode 100644 cpp/src/expression.cc diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 63440c90c..37e0e2835 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -191,11 +191,13 @@ macro(build_gar) if(APPLE) target_link_libraries(gar PRIVATE -Wl,-force_load gar_arrow_static "${GAR_PARQUET_STATIC_LIB}" + "${GAR_DATASET_STATIC_LIB}" "${GAR_ACERO_STATIC_LIB}" "${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}") else() target_link_libraries(gar PRIVATE -Wl,--exclude-libs,ALL -Wl,--whole-archive gar_arrow_static "${GAR_PARQUET_STATIC_LIB}" + "${GAR_DATASET_STATIC_LIB}" "${GAR_ARROW_ACERO_STATIC_LIB}" "${GAR_ARROW_BUNDLED_DEPS_STATIC_LIB}" -Wl,--no-whole-archive) endif() diff --git a/cpp/cmake/apache-arrow.cmake b/cpp/cmake/apache-arrow.cmake index 950f026b6..4a37486c6 100644 --- a/cpp/cmake/apache-arrow.cmake +++ b/cpp/cmake/apache-arrow.cmake @@ -44,6 +44,9 @@ function(build_arrow) set(GAR_PARQUET_STATIC_LIB_FILENAME "${CMAKE_STATIC_LIBRARY_PREFIX}parquet${CMAKE_STATIC_LIBRARY_SUFFIX}") set(GAR_PARQUET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_PARQUET_STATIC_LIB_FILENAME}" CACHE INTERNAL "parquet lib") + set(GAR_DATASET_STATIC_LIB_FILENAME + "${CMAKE_STATIC_LIBRARY_PREFIX}arrow_dataset${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(GAR_DATASET_STATIC_LIB "${GAR_ARROW_STATIC_LIBRARY_DIR}/${GAR_DATASET_STATIC_LIB_FILENAME}" CACHE INTERNAL "arrow dataset lib") set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB_FILENAME "${CMAKE_STATIC_LIBRARY_PREFIX}arrow_bundled_dependencies${CMAKE_STATIC_LIBRARY_SUFFIX}") set(GAR_ARROW_BUNDLED_DEPS_STATIC_LIB @@ -83,7 +86,7 @@ function(build_arrow) "-DARROW_S3=ON") set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory") - set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}") + set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}" "${GAR_DATASET_STATIC_LIB}") find_package(Threads) find_package(Arrow QUIET) @@ -104,16 +107,21 @@ function(build_arrow) set(GAR_ARROW_LIBRARY_TARGET gar_arrow_static) set(GAR_PARQUET_LIBRARY_TARGET gar_parquet_static) + set(GAR_DATASET_LIBRARY_TARGET gar_dataset_static) file(MAKE_DIRECTORY "${GAR_ARROW_INCLUDE_DIR}") add_library(${GAR_ARROW_LIBRARY_TARGET} STATIC IMPORTED) add_library(${GAR_PARQUET_LIBRARY_TARGET} STATIC IMPORTED) + add_library(${GAR_DATASET_LIBRARY_TARGET} STATIC IMPORTED) set_target_properties(${GAR_ARROW_LIBRARY_TARGET} PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR} IMPORTED_LOCATION ${GAR_ARROW_STATIC_LIB}) set_target_properties(${GAR_PARQUET_LIBRARY_TARGET} PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR} IMPORTED_LOCATION ${GAR_PARQUET_STATIC_LIB}) + set_target_properties(${GAR_DATASET_LIBRARY_TARGET} + PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${GAR_ARROW_INCLUDE_DIR} + IMPORTED_LOCATION ${GAR_DATASET_STATIC_LIB}) if (ARROW_VERSION_TO_BUILD GREATER_EQUAL "12.0.0") set(GAR_ARROW_ACERO_STATIC_LIB_FILENAME "${CMAKE_STATIC_LIBRARY_PREFIX}arrow_acero${CMAKE_STATIC_LIBRARY_SUFFIX}") diff --git a/cpp/include/gar/graph_info.h b/cpp/include/gar/graph_info.h index 5e6997f20..df6d34ce4 100644 --- a/cpp/include/gar/graph_info.h +++ b/cpp/include/gar/graph_info.h @@ -19,6 +19,7 @@ limitations under the License. #include #include #include +#include #include #include "utils/adj_list_type.h" @@ -43,6 +44,11 @@ struct Property { std::string name; // property name DataType type; // property data type bool is_primary; // primary key tag + + Property() {} + explicit Property(const std::string& name) : name(name) {} + Property(const std::string& name, const DataType& type, bool is_primary) + : name(name), type(type), is_primary(is_primary) {} }; static bool operator==(const Property& lhs, const Property& rhs) { @@ -87,6 +93,7 @@ class PropertyGroup { std::vector names; for (auto& property : properties_) { names.push_back(property.name); + property_names_.insert(property.name); } prefix_ = util::ConcatStringWithDelimiter(names, REGULAR_SEPERATOR) + "/"; } @@ -121,6 +128,10 @@ class PropertyGroup { return properties_; } + inline bool ContainProperty(const std::string& property_name) const { + return property_names_.find(property_name) != property_names_.end(); + } + /** Get the file type of property group chunk file. * * @return The file type of group. @@ -146,6 +157,7 @@ class PropertyGroup { private: std::vector properties_; + std::unordered_set property_names_; FileType file_type_; std::string prefix_; }; diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h b/cpp/include/gar/reader/arrow_chunk_reader.h index abbd9ae9a..e350d9376 100644 --- a/cpp/include/gar/reader/arrow_chunk_reader.h +++ b/cpp/include/gar/reader/arrow_chunk_reader.h @@ -24,7 +24,6 @@ limitations under the License. #include "gar/graph_info.h" #include "gar/utils/data_type.h" #include "gar/utils/filesystem.h" -#include "gar/utils/reader_utils.h" #include "gar/utils/result.h" #include "gar/utils/status.h" #include "gar/utils/utils.h" @@ -52,18 +51,22 @@ class VertexPropertyArrowChunkReader { VertexPropertyArrowChunkReader(const VertexInfo& vertex_info, const PropertyGroup& property_group, const std::string& prefix, - IdType chunk_index = 0) + IdType chunk_index = 0, + const utils::FilterOptions& options = {}) : vertex_info_(vertex_info), property_group_(property_group), chunk_index_(chunk_index), seek_id_(chunk_index * vertex_info.GetChunkSize()), - chunk_table_(nullptr) { + chunk_table_(nullptr), + filter_options_(options) { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix, vertex_info.GetPathPrefix(property_group)); std::string base_dir = prefix_ + pg_path_prefix; GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, utils::GetVertexChunkNum(prefix_, vertex_info)); + GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_, + utils::GetVertexNum(prefix_, vertex_info_)); } /** @@ -126,6 +129,22 @@ class VertexPropertyArrowChunkReader { */ IdType GetChunkNum() const noexcept { return chunk_num_; } + /** + * @brief Apply the row filter to the table. No parameter call Filter() will + * clear the filter. + * + * @param filter Predicate expression to filter rows. + */ + void Filter(utils::Filter filter = nullptr); + + /** + * @brief Apply the projection to the table to be read. No parameter call + * Select() will clear the projection. + * + * @param column_names The name of columns to be selected. + */ + void Select(utils::ColumnNames column_names = std::nullopt); + private: VertexInfo vertex_info_; PropertyGroup property_group_; @@ -133,7 +152,9 @@ class VertexPropertyArrowChunkReader { IdType chunk_index_; IdType seek_id_; IdType chunk_num_; + IdType vertex_num_; std::shared_ptr chunk_table_; + utils::FilterOptions filter_options_; std::shared_ptr fs_; }; @@ -227,7 +248,8 @@ class AdjListArrowChunkReader { } /** - * @brief Return the current chunk of chunk position indicator as arrow::Table + * @brief Return the current chunk of chunk position indicator as + * arrow::Table */ Result> GetChunk() noexcept; @@ -420,7 +442,8 @@ class AdjListPropertyArrowChunkReader { * @brief Initialize the AdjListPropertyArrowChunkReader. * * @param edge_info The edge info that describes the edge type. - * @param property_group The property group that describes the property group. + * @param property_group The property group that describes the property + * group. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix. * @param vertex_chunk_index The vertex chunk index, default is 0. @@ -429,7 +452,8 @@ class AdjListPropertyArrowChunkReader { const PropertyGroup& property_group, AdjListType adj_list_type, const std::string prefix, - IdType vertex_chunk_index = 0) + IdType vertex_chunk_index = 0, + const utils::FilterOptions& options = {}) : edge_info_(edge_info), property_group_(property_group), adj_list_type_(adj_list_type), @@ -437,7 +461,8 @@ class AdjListPropertyArrowChunkReader { vertex_chunk_index_(vertex_chunk_index), chunk_index_(0), seek_offset_(0), - chunk_table_(nullptr) { + chunk_table_(nullptr), + filter_options_(options) { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR( auto pg_path_prefix, @@ -463,6 +488,7 @@ class AdjListPropertyArrowChunkReader { chunk_index_(other.chunk_index_), seek_offset_(other.seek_offset_), chunk_table_(nullptr), + filter_options_(other.filter_options_), vertex_chunk_num_(other.vertex_chunk_num_), chunk_num_(other.chunk_num_), base_dir_(other.base_dir_), @@ -506,7 +532,8 @@ class AdjListPropertyArrowChunkReader { } /** - * @brief Return the current chunk of chunk position indicator as arrow::Table + * @brief Return the current chunk of chunk position indicator as + * arrow::Table */ Result> GetChunk() noexcept; @@ -564,6 +591,22 @@ class AdjListPropertyArrowChunkReader { return Status::OK(); } + /** + * @brief Apply the row filter to the table. No parameter call Filter() will + * clear the filter. + * + * @param filter Predicate expression to filter rows. + */ + void Filter(utils::Filter filter = nullptr); + + /** + * @brief Apply the projection to the table to be read. No parameter call + * Select() will clear the projection. + * + * @param column_names The name of columns to be selected. + */ + void Select(utils::ColumnNames column_names = std::nullopt); + private: EdgeInfo edge_info_; PropertyGroup property_group_; @@ -572,6 +615,7 @@ class AdjListPropertyArrowChunkReader { IdType vertex_chunk_index_, chunk_index_; IdType seek_offset_; std::shared_ptr chunk_table_; + utils::FilterOptions filter_options_; IdType vertex_chunk_num_, chunk_num_; std::string base_dir_; std::shared_ptr fs_; @@ -587,7 +631,8 @@ class AdjListPropertyArrowChunkReader { static inline Result ConstructVertexPropertyArrowChunkReader( const GraphInfo& graph_info, const std::string& label, - const PropertyGroup& property_group) noexcept { + const PropertyGroup& property_group, + const utils::FilterOptions& options = {}) noexcept { VertexInfo vertex_info; GAR_ASSIGN_OR_RAISE(vertex_info, graph_info.GetVertexInfo(label)); if (!vertex_info.ContainPropertyGroup(property_group)) { @@ -595,7 +640,7 @@ ConstructVertexPropertyArrowChunkReader( label, "."); } return VertexPropertyArrowChunkReader(vertex_info, property_group, - graph_info.GetPrefix()); + graph_info.GetPrefix(), 0, options); } /** @@ -663,12 +708,11 @@ ConstructAdjListOffsetArrowChunkReader(const GraphInfo& graph_info, * @param adj_list_type The adj list type for the edges. */ static inline Result -ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info, - const std::string& src_label, - const std::string& edge_label, - const std::string& dst_label, - const PropertyGroup& property_group, - AdjListType adj_list_type) noexcept { +ConstructAdjListPropertyArrowChunkReader( + const GraphInfo& graph_info, const std::string& src_label, + const std::string& edge_label, const std::string& dst_label, + const PropertyGroup& property_group, AdjListType adj_list_type, + const utils::FilterOptions& options = {}) noexcept { EdgeInfo edge_info; GAR_ASSIGN_OR_RAISE(edge_info, graph_info.GetEdgeInfo(src_label, edge_label, dst_label)); @@ -683,7 +727,8 @@ ConstructAdjListPropertyArrowChunkReader(const GraphInfo& graph_info, AdjListTypeToString(adj_list_type), "."); } return AdjListPropertyArrowChunkReader(edge_info, property_group, - adj_list_type, graph_info.GetPrefix()); + adj_list_type, graph_info.GetPrefix(), + 0, options); } } // namespace GAR_NAMESPACE_INTERNAL diff --git a/cpp/include/gar/utils/expression.h b/cpp/include/gar/utils/expression.h new file mode 100644 index 000000000..cc53505fc --- /dev/null +++ b/cpp/include/gar/utils/expression.h @@ -0,0 +1,356 @@ +/** Copyright 2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include + +#include "arrow/compute/api.h" + +#include "gar/graph_info.h" + +#ifndef GAR_UTILS_EXPRESSION_H_ +#define GAR_UTILS_EXPRESSION_H_ + +namespace GAR_NAMESPACE_INTERNAL { + +using ArrowExpression = arrow::compute::Expression; + +/** + * This class wraps an arrow::compute::Expression and provides a way to + * construct it + */ +class Expression { + public: + Expression() = default; + Expression(const Expression& other) = default; + virtual ~Expression() = default; + + /** + * @brief Evaluate Expression as arrow::compute::Expression e.g. new + * ExpressionEqual(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) will be parsed as + * arrow::compute::equal(arrow::compute::field_ref("a"), + * arrow::compute::literal(1)) + * + * @return The arrow::compute::Expression instance + */ + virtual Result Evaluate() = 0; +}; + +/** + * This class wraps the Property and provides a way to construct property + * expression + */ +class ExpressionProperty : public Expression { + public: + explicit ExpressionProperty(const Property& property) : property_(property) {} + explicit ExpressionProperty(const std::string& name) + : property_(Property(name)) {} + ExpressionProperty(const ExpressionProperty& other) = default; + ~ExpressionProperty() = default; + + Result Evaluate() override; + + private: + Property property_; +}; + +/** + * This class wraps the literal. Only bool, int32, int64, float, double and + * string are allowed. + */ +template || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || + std::is_same_v, + typename = std::enable_if_t> +class ExpressionLiteral : public Expression { + public: + explicit ExpressionLiteral(T value) : value_(value) {} + ExpressionLiteral(const ExpressionLiteral& other) = default; + ~ExpressionLiteral() = default; + + Result Evaluate() { return arrow::compute::literal(value_); } + + private: + T value_; +}; + +/** + * This class constructs a unary operator expression that accepts only one + * expression + */ +class ExpressionUnaryOp : public Expression { + public: + ExpressionUnaryOp() = default; + explicit ExpressionUnaryOp(std::shared_ptr expr) : expr_(expr) {} + ExpressionUnaryOp(const ExpressionUnaryOp& other) = default; + virtual ~ExpressionUnaryOp() {} + + protected: + std::shared_ptr expr_; +}; + +/** + * This class constructs a NOT operator expression. e.g. new ExpressionNot(new + * ExpressionLiteral(true)) => NOT TRUE + */ +class ExpressionNot : public ExpressionUnaryOp { + public: + ExpressionNot() = default; + explicit ExpressionNot(std::shared_ptr expr) + : ExpressionUnaryOp(expr) {} + ExpressionNot(const ExpressionNot& other) = default; + ~ExpressionNot() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a binary operator expression that accepts two + * expressions e.g. a = 1, a > 1, a AND b, a OR b + */ +class ExpressionBinaryOp : public Expression { + public: + ExpressionBinaryOp() = default; + ExpressionBinaryOp(std::shared_ptr lhs, + std::shared_ptr rhs) + : lhs_(lhs), rhs_(rhs) {} + ExpressionBinaryOp(const ExpressionBinaryOp& other) = default; + ~ExpressionBinaryOp() = default; + + protected: + inline Status CheckNullArgs(std::shared_ptr lhs, + std::shared_ptr rhs) noexcept { + if (lhs == nullptr || rhs == nullptr) { + return Status::Invalid("Invalid expression: lhs or rhs is null"); + } + return Status::OK(); + } + + protected: + std::shared_ptr lhs_; + std::shared_ptr rhs_; +}; + +/** + * This class constructs a EQUAL operator expression. + * e.g. new ExpressionEqual(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a = 1 + */ +class ExpressionEqual : public ExpressionBinaryOp { + public: + ExpressionEqual() = default; + ExpressionEqual(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionEqual(const ExpressionEqual& other) = default; + ~ExpressionEqual() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a NOT EQUAL operator expression. + * e.g. new ExpressionNotEqual(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a != 1 + */ +class ExpressionNotEqual : public ExpressionBinaryOp { + public: + ExpressionNotEqual() = default; + ExpressionNotEqual(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionNotEqual(const ExpressionNotEqual& other) = default; + ~ExpressionNotEqual() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a GREATER THAN operator expression. + * e.g. new ExpressionGreaterThan(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a > 1 + */ +class ExpressionGreaterThan : public ExpressionBinaryOp { + public: + ExpressionGreaterThan() = default; + ExpressionGreaterThan(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionGreaterThan(const ExpressionGreaterThan& other) = default; + ~ExpressionGreaterThan() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a GREATER EQUAL operator expression. + * e.g. new ExpressionGreaterEqual(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a >= 1 + */ +class ExpressionGreaterEqual : public ExpressionBinaryOp { + public: + ExpressionGreaterEqual() = default; + ExpressionGreaterEqual(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionGreaterEqual(const ExpressionGreaterEqual& other) = default; + ~ExpressionGreaterEqual() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a LESS THAN operator expression. + * e.g. new ExpressionLessThan(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a < 1 + */ +class ExpressionLessThan : public ExpressionBinaryOp { + public: + ExpressionLessThan() = default; + ExpressionLessThan(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionLessThan(const ExpressionLessThan& other) = default; + ~ExpressionLessThan() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a LESS EQUAL operator expression. + * e.g. new ExpressionLessEqual(new ExpressionProperty("a"), new + * ExpressionLiteral(1)) => a <= 1 + */ +class ExpressionLessEqual : public ExpressionBinaryOp { + public: + ExpressionLessEqual() = default; + ExpressionLessEqual(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionLessEqual(const ExpressionLessEqual& other) = default; + ~ExpressionLessEqual() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a AND operator expression. + * e.g. new ExpressionAnd(new ExpressionLiteral(true), new + * ExpressionLiteral(1)) => TRUE AND 1 + */ +class ExpressionAnd : public ExpressionBinaryOp { + public: + ExpressionAnd() = default; + ExpressionAnd(std::shared_ptr lhs, + std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionAnd(const ExpressionAnd& other) = default; + ~ExpressionAnd() = default; + + Result Evaluate() override; +}; + +/** + * This class constructs a OR operator expression. + * e.g. new ExpressionOr(new ExpressionLiteral(0), new + * ExpressionLiteral(true)) => 0 OR TRUE + */ +class ExpressionOr : public ExpressionBinaryOp { + public: + ExpressionOr() = default; + ExpressionOr(std::shared_ptr lhs, std::shared_ptr rhs) + : ExpressionBinaryOp(lhs, rhs) {} + ExpressionOr(const ExpressionOr& other) = default; + ~ExpressionOr() = default; + + Result Evaluate() override; +}; + +/** + * Helper functions to construct a Expression. + */ +[[nodiscard]] static inline std::shared_ptr _Property( + const Property& property) { + return std::make_shared(property); +} + +[[nodiscard]] static inline std::shared_ptr _Property( + const std::string& name) { + return std::make_shared(name); +} + +template || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || + std::is_same_v, + typename = std::enable_if_t> +[[nodiscard]] static inline std::shared_ptr _Literal(T value) { + return std::make_shared>(value); +} + +[[nodiscard]] static inline std::shared_ptr _Not( + std::shared_ptr expr) { + return std::make_shared(expr); +} + +[[nodiscard]] static inline std::shared_ptr _Equal( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _NotEqual( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _GreaterThan( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _GreaterEqual( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _LessThan( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _LessEqual( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _And( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} + +[[nodiscard]] static inline std::shared_ptr _Or( + std::shared_ptr lhs, std::shared_ptr rhs) { + return std::make_shared(lhs, rhs); +} +} // namespace GAR_NAMESPACE_INTERNAL +#endif // GAR_UTILS_EXPRESSION_H_ diff --git a/cpp/include/gar/utils/filesystem.h b/cpp/include/gar/utils/filesystem.h index 5313a4dd2..34afccd6c 100644 --- a/cpp/include/gar/utils/filesystem.h +++ b/cpp/include/gar/utils/filesystem.h @@ -17,13 +17,18 @@ limitations under the License. #define GAR_UTILS_FILESYSTEM_H_ #include +#include #include +#include #include "gar/utils/file_type.h" #include "gar/utils/result.h" #include "gar/utils/status.h" #include "gar/utils/utils.h" +#include "arrow/dataset/api.h" +#include "gar/utils/reader_utils.h" + // forward declarations namespace arrow { class Buffer; @@ -55,15 +60,17 @@ class FileSystem { ~FileSystem() = default; /** - * @brief Read a file as an arrow::Table. + * @brief Read and filter a file as an arrow::Table. * * @param path The path of the file to read. * @param file_type The type of the file to read. + * @param options Row filter and columns to be selected * @return A Result containing a std::shared_ptr to an arrow::Table if * successful, or an error Status if unsuccessful. */ Result> ReadFileToTable( - const std::string& path, FileType file_type) const noexcept; + const std::string& path, FileType file_type, + const utils::FilterOptions& options = {}) const noexcept; /** * @brief Read a file and convert its bytes to a value of type T. @@ -116,6 +123,10 @@ class FileSystem { Result GetFileNumOfDir(const std::string& dir_path, bool recursive = false) const noexcept; + private: + std::shared_ptr GetFileFormat( + const FileType file_type) const; + private: std::shared_ptr arrow_fs_; }; diff --git a/cpp/include/gar/utils/reader_utils.h b/cpp/include/gar/utils/reader_utils.h index c63f36011..88d80e17a 100644 --- a/cpp/include/gar/utils/reader_utils.h +++ b/cpp/include/gar/utils/reader_utils.h @@ -16,15 +16,36 @@ limitations under the License. #ifndef GAR_UTILS_READER_UTILS_H_ #define GAR_UTILS_READER_UTILS_H_ +#include #include #include +#include #include "gar/graph_info.h" +#include "gar/utils/expression.h" namespace GAR_NAMESPACE_INTERNAL { namespace utils { +using Filter = std::shared_ptr; +using ColumnNames = + std::optional>>; + +struct FilterOptions { + // The row filter to apply to the table. + Filter filter = nullptr; + // The columns to include in the table. Select all columns by default. + ColumnNames columns = std::nullopt; + + FilterOptions() {} + FilterOptions(Filter filter, ColumnNames columns) + : filter(filter), columns(columns) {} +}; + +Status CheckFilterOptions(const FilterOptions& filter_options, + const PropertyGroup& property_group) noexcept; + Result> GetAdjListOffsetOfVertex( const EdgeInfo& edge_info, const std::string& prefix, AdjListType adj_list_type, IdType vid) noexcept; diff --git a/cpp/src/arrow_chunk_reader.cc b/cpp/src/arrow_chunk_reader.cc index c80a774cd..ec1e9a1f0 100644 --- a/cpp/src/arrow_chunk_reader.cc +++ b/cpp/src/arrow_chunk_reader.cc @@ -13,8 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include - #include "arrow/api.h" #include "gar/reader/arrow_chunk_reader.h" @@ -24,13 +22,16 @@ namespace GAR_NAMESPACE_INTERNAL { Result> VertexPropertyArrowChunkReader::GetChunk() noexcept { + GAR_RETURN_NOT_OK( + utils::CheckFilterOptions(filter_options_, property_group_)); if (chunk_table_ == nullptr) { GAR_ASSIGN_OR_RAISE( auto chunk_file_path, vertex_info_.GetFilePath(property_group_, chunk_index_)); std::string path = prefix_ + chunk_file_path; - GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable( - path, property_group_.GetFileType())); + GAR_ASSIGN_OR_RAISE( + chunk_table_, fs_->ReadFileToTable(path, property_group_.GetFileType(), + filter_options_)); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_.GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -43,9 +44,21 @@ VertexPropertyArrowChunkReader::GetRange() noexcept { "The chunk table is not initialized, please call " "GetChunk() first."); } - IdType row_offset = seek_id_ - chunk_index_ * vertex_info_.GetChunkSize(); - return std::make_pair(seek_id_, - seek_id_ + chunk_table_->num_rows() - row_offset); + const auto chunk_size = vertex_info_.GetChunkSize(); + IdType row_offset = seek_id_ - chunk_index_ * chunk_size; + bool is_last_chunk = (chunk_index_ == chunk_num_ - 1); + const auto curr_chunk_size = + is_last_chunk ? (vertex_num_ - chunk_index_ * chunk_size) : chunk_size; + + return std::make_pair(seek_id_, seek_id_ + curr_chunk_size - row_offset); +} + +void VertexPropertyArrowChunkReader::Filter(utils::Filter filter) { + filter_options_.filter = filter; +} + +void VertexPropertyArrowChunkReader::Select(utils::ColumnNames column_names) { + filter_options_.columns = column_names; } Status AdjListArrowChunkReader::seek_src(IdType id) noexcept { @@ -224,17 +237,28 @@ AdjListOffsetArrowChunkReader::GetChunk() noexcept { Result> AdjListPropertyArrowChunkReader::GetChunk() noexcept { + GAR_RETURN_NOT_OK( + utils::CheckFilterOptions(filter_options_, property_group_)); if (chunk_table_ == nullptr) { GAR_ASSIGN_OR_RAISE( auto chunk_file_path, edge_info_.GetPropertyFilePath(property_group_, adj_list_type_, vertex_chunk_index_, chunk_index_)); std::string path = prefix_ + chunk_file_path; - GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable( - path, property_group_.GetFileType())); + GAR_ASSIGN_OR_RAISE( + chunk_table_, fs_->ReadFileToTable(path, property_group_.GetFileType(), + filter_options_)); } IdType row_offset = seek_offset_ - chunk_index_ * edge_info_.GetChunkSize(); return chunk_table_->Slice(row_offset); } +void AdjListPropertyArrowChunkReader::Filter(utils::Filter filter) { + filter_options_.filter = filter; +} + +void AdjListPropertyArrowChunkReader::Select(utils::ColumnNames column_names) { + filter_options_.columns = column_names; +} + } // namespace GAR_NAMESPACE_INTERNAL diff --git a/cpp/src/expression.cc b/cpp/src/expression.cc new file mode 100644 index 000000000..e968efa34 --- /dev/null +++ b/cpp/src/expression.cc @@ -0,0 +1,83 @@ +/** Copyright 2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "gar/utils/expression.h" + +namespace GAR_NAMESPACE_INTERNAL { + +Result ExpressionProperty::Evaluate() { + return arrow::compute::field_ref(property_.name); +} +Result ExpressionNot::Evaluate() { + GAR_ASSIGN_OR_RAISE(auto expr, expr_->Evaluate()); + return arrow::compute::not_(expr); +} + +Result ExpressionEqual::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::equal(lexpr, rexpr); +} + +Result ExpressionNotEqual::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::not_equal(lexpr, rexpr); +} + +Result ExpressionGreaterThan::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::greater(lexpr, rexpr); +} + +Result ExpressionGreaterEqual::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::greater_equal(lexpr, rexpr); +} + +Result ExpressionLessThan::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::less(lexpr, rexpr); +} + +Result ExpressionLessEqual::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::less_equal(lexpr, rexpr); +} + +Result ExpressionAnd::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::and_(lexpr, rexpr); +} + +Result ExpressionOr::Evaluate() { + GAR_RETURN_NOT_OK(CheckNullArgs(lhs_, rhs_)); + GAR_ASSIGN_OR_RAISE(auto lexpr, lhs_->Evaluate()); + GAR_ASSIGN_OR_RAISE(auto rexpr, rhs_->Evaluate()); + return arrow::compute::or_(lexpr, rexpr); +} + +} // namespace GAR_NAMESPACE_INTERNAL diff --git a/cpp/src/filesystem.cc b/cpp/src/filesystem.cc index 14bb9e7db..d10b45c3b 100644 --- a/cpp/src/filesystem.cc +++ b/cpp/src/filesystem.cc @@ -17,16 +17,13 @@ limitations under the License. #include "arrow/api.h" #include "arrow/csv/api.h" #include "arrow/filesystem/api.h" -#include "arrow/io/api.h" #include "arrow/ipc/writer.h" -#include "arrow/util/uri.h" -#include "parquet/arrow/reader.h" #include "parquet/arrow/writer.h" #include "gar/utils/filesystem.h" namespace GAR_NAMESPACE_INTERNAL { - +namespace ds = arrow::dataset; namespace detail { template static Status CastToLargeOffsetArray( @@ -81,44 +78,42 @@ Result ParseFileSystemUri(const std::string& uri_string) { } } // namespace detail -Result> FileSystem::ReadFileToTable( - const std::string& path, FileType file_type) const noexcept { - arrow::MemoryPool* pool = arrow::default_memory_pool(); - std::shared_ptr table; - switch (file_type) { - case FileType::CSV: { - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto is, - arrow_fs_->OpenInputStream(path)); - auto read_options = arrow::csv::ReadOptions::Defaults(); - auto parse_options = arrow::csv::ParseOptions::Defaults(); - auto convert_options = arrow::csv::ConvertOptions::Defaults(); - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( - auto reader, arrow::csv::TableReader::Make( - arrow::io::IOContext(pool), is, read_options, - parse_options, convert_options)); - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, reader->Read()); - break; - } - case FileType::PARQUET: { - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto input, - arrow_fs_->OpenInputFile(path)); - std::unique_ptr reader; - RETURN_NOT_ARROW_OK(parquet::arrow::OpenFile(input, pool, &reader)); - RETURN_NOT_ARROW_OK(reader->ReadTable(&table)); - break; +std::shared_ptr FileSystem::GetFileFormat( + const FileType type) const { + switch (type) { + case CSV: + return std::make_shared(); + case PARQUET: + return std::make_shared(); + case ORC: + return std::make_shared(); + default: + return nullptr; } - case FileType::ORC: { - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto input, - arrow_fs_->OpenInputFile(path)); - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( - auto reader, arrow::adapters::orc::ORCFileReader::Open(input, pool)); - GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, reader->Read()); - break; +} + +Result> FileSystem::ReadFileToTable( + const std::string& path, FileType file_type, + const utils::FilterOptions& options) const noexcept { + std::shared_ptr format = GetFileFormat(file_type); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( + auto factory, arrow::dataset::FileSystemDatasetFactory::Make( + arrow_fs_, {path}, format, + arrow::dataset::FileSystemFactoryOptions())); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto dataset, factory->Finish()); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scan_builder, dataset->NewScan()); + + // Apply the row filter and select the specified columns + if (options.filter) { + GAR_ASSIGN_OR_RAISE(auto filter, options.filter->Evaluate()); + RETURN_NOT_ARROW_OK(scan_builder->Filter(filter)); } - default: - return Status::Invalid("Unsupported file type: ", - FileTypeToString(file_type)); + if (options.columns) { + RETURN_NOT_ARROW_OK(scan_builder->Project(*options.columns)); } + + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scanner, scan_builder->Finish()); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto table, scanner->ToTable()); // cast string array to large string array as we need concatenate chunks in // some places, e.g., in vineyard for (int i = 0; i < table->num_columns(); ++i) { @@ -134,7 +129,11 @@ Result> FileSystem::ReadFileToTable( // do casting auto field = table->field(i)->WithType(type); std::shared_ptr chunked_array; - if (type->Equals(arrow::large_utf8())) { + + if (table->num_rows() == 0) { + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( + chunked_array, arrow::ChunkedArray::MakeEmpty(type)); + } else if (type->Equals(arrow::large_utf8())) { auto status = detail::CastToLargeOffsetArray( table->column(i), type, chunked_array); diff --git a/cpp/src/reader_utils.cc b/cpp/src/reader_utils.cc index 08b0cc49f..a0e6ea5e2 100644 --- a/cpp/src/reader_utils.cc +++ b/cpp/src/reader_utils.cc @@ -27,6 +27,40 @@ limitations under the License. namespace GAR_NAMESPACE_INTERNAL { namespace utils { + +/** + * @brief Checks whether the property names in the FilterOptions match the + * properties in the property group + * + * @param filter_options filter options + * @param property_group property group + * @return Status error if the property names in the FilterOptions do not match + */ +Status CheckFilterOptions(const FilterOptions& filter_options, + const PropertyGroup& property_group) noexcept { + if (filter_options.filter) { + GAR_ASSIGN_OR_RAISE(auto filter, filter_options.filter->Evaluate()); + for (const auto& field : arrow::compute::FieldsInExpression(filter)) { + auto property_name = *field.name(); + if (!property_group.ContainProperty(property_name)) { + return Status::Invalid( + property_name, " in the filter does not match the property group: ", + property_group); + } + } + } + if (filter_options.columns.has_value()) { + for (const auto& col : filter_options.columns.value().get()) { + if (!property_group.ContainProperty(col)) { + return Status::Invalid( + col, " in the columns does not match the property group: ", + property_group); + } + } + } + return Status::OK(); +} + /** * @brief parse the vertex id to related adj list offset * diff --git a/cpp/test/test_arrow_chunk_reader.cc b/cpp/test/test_arrow_chunk_reader.cc index 386140c2a..2801e2c08 100644 --- a/cpp/test/test_arrow_chunk_reader.cc +++ b/cpp/test/test_arrow_chunk_reader.cc @@ -15,22 +15,21 @@ limitations under the License. #include -#include "arrow/adapters/orc/adapter.h" #include "arrow/api.h" -#include "arrow/csv/api.h" -#include "arrow/filesystem/api.h" -#include "arrow/io/api.h" -#include "arrow/stl.h" -#include "arrow/util/uri.h" -#include "parquet/arrow/writer.h" #include "./util.h" #include "gar/reader/arrow_chunk_reader.h" -#include "gar/writer/arrow_chunk_writer.h" #define CATCH_CONFIG_MAIN #include +using GAR_NAMESPACE::_And; +using GAR_NAMESPACE::_Equal; +using GAR_NAMESPACE::_LessThan; +using GAR_NAMESPACE::_Literal; +using GAR_NAMESPACE::_Property; +using GAR_NAMESPACE::utils::FilterOptions; + TEST_CASE("test_vertex_property_arrow_chunk_reader") { std::string root; REQUIRE(GetTestResourceRoot(&root).ok()); @@ -90,6 +89,111 @@ TEST_CASE("test_vertex_property_arrow_chunk_reader") { REQUIRE(reader.seek(1024).IsIndexError()); } +TEST_CASE("test_vertex_property_pushdown") { + std::string root; + REQUIRE(GetTestResourceRoot(&root).ok()); + std::string path = root + "/ldbc_sample/parquet/ldbc_sample.graph.yml"; + std::string label = "person", property_name = "gender"; + + auto filter = _Equal(_Property("gender"), _Literal("female")); + std::vector expected_cols{"firstName", "lastName"}; + + // read file and construct graph info + auto maybe_graph_info = GAR_NAMESPACE::GraphInfo::Load(path); + REQUIRE(maybe_graph_info.status().ok()); + auto graph_info = maybe_graph_info.value(); + // construct vertex chunk reader + REQUIRE(graph_info.GetVertexInfo(label).status().ok()); + const auto chunk_size = graph_info.GetVertexInfo(label)->GetChunkSize(); + auto maybe_group = graph_info.GetVertexPropertyGroup(label, property_name); + REQUIRE(maybe_group.status().ok()); + auto group = maybe_group.value(); + + // print reader result + auto walkReader = [&](GAR_NAMESPACE::VertexPropertyArrowChunkReader& reader) { + int idx = 0, sum = 0; + std::shared_ptr table; + + do { + auto result = reader.GetChunk(); + REQUIRE(!result.has_error()); + table = result.value(); + auto [start, end] = reader.GetRange().value(); + std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << "/" + << chunk_size << ",\tRange: (" << start << ", " << end << "]" + << '\n'; + idx++; + sum += table->num_rows(); + } while (!reader.next_chunk().IsIndexError()); + REQUIRE(idx == reader.GetChunkNum()); + REQUIRE(table->num_columns() == (int) expected_cols.size()); + + std::cout << "Total Nums: " << sum << "/" + << reader.GetChunkNum() * chunk_size << '\n'; + std::cout << "Column Nums: " << table->num_columns() << "\n"; + std::cout << "Column Names: "; + for (int i = 0; i < table->num_columns(); i++) { + REQUIRE(table->ColumnNames()[i] == expected_cols[i]); + std::cout << "`" << table->ColumnNames()[i] << "` "; + } + std::cout << "\n\n"; + }; + + SECTION("pushdown by helper function") { + std::cout << "Vertex property pushdown by helper function:\n"; + // construct pushdown options + FilterOptions options; + options.filter = filter; + options.columns = expected_cols; + auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader( + graph_info, label, group, options); + REQUIRE(maybe_reader.status().ok()); + walkReader(maybe_reader.value()); + } + + SECTION("pushdown by function Filter() & Select()") { + std::cout << "Vertex property pushdown by Filter() & Select():\n"; + auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader( + graph_info, label, group); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + reader.Filter(filter); + reader.Select(expected_cols); + walkReader(reader); + } + + SECTION("pushdown property that don't exist") { + std::cout << "Vertex property pushdown property that don't exist:\n"; + auto filter = _Equal(_Property("id"), _Literal(933)); + FilterOptions options; + options.filter = filter; + options.columns = expected_cols; + auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader( + graph_info, label, group, options); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + auto result = reader.GetChunk(); + REQUIRE(result.error().IsInvalid()); + std::cerr << result.error().message() << std::endl; + } + + SECTION("pushdown column that don't exist") { + std::cout << "Vertex property pushdown column that don't exist:\n"; + auto filter = _Literal(true); + std::vector expected_cols{"id"}; + FilterOptions options; + options.filter = filter; + options.columns = expected_cols; + auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader( + graph_info, label, group, options); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + auto result = reader.GetChunk(); + REQUIRE(result.error().IsInvalid()); + std::cerr << result.error().message() << std::endl; + } +} + TEST_CASE("test_adj_list_arrow_chunk_reader") { std::string root; REQUIRE(GetTestResourceRoot(&root).ok()); @@ -201,6 +305,91 @@ TEST_CASE("test_adj_list_property_arrow_chunk_reader") { REQUIRE(reader.next_chunk().IsIndexError()); } +TEST_CASE("test_adj_list_property_pushdown") { + std::string root; + REQUIRE(GetTestResourceRoot(&root).ok()); + + // read file and construct graph info + std::string path = root + "/ldbc_sample/parquet/ldbc_sample.graph.yml"; + auto maybe_graph_info = GAR_NAMESPACE::GraphInfo::Load(path); + REQUIRE(maybe_graph_info.status().ok()); + auto graph_info = maybe_graph_info.value(); + + std::string src_label = "person", edge_label = "knows", dst_label = "person", + property_name = "creationDate"; + REQUIRE( + graph_info.GetEdgeInfo(src_label, edge_label, dst_label).status().ok()); + const auto chunk_size = + graph_info.GetEdgeInfo(src_label, edge_label, dst_label)->GetChunkSize(); + auto maybe_group = graph_info.GetEdgePropertyGroup( + src_label, edge_label, dst_label, property_name, + GAR_NAMESPACE::AdjListType::ordered_by_source); + REQUIRE(maybe_group.status().ok()); + auto group = maybe_group.value(); + + // construct pushdown options + auto expr1 = _LessThan(_Literal("2012-06-02T04:30:44.526+0000"), + _Property(property_name)); + auto expr2 = _Equal(_Property(property_name), _Property(property_name)); + auto filter = _And(expr1, expr2); + + std::vector expected_cols{"creationDate"}; + + FilterOptions options; + options.filter = filter; + options.columns = expected_cols; + + // print reader result + auto walkReader = + [&](GAR_NAMESPACE::AdjListPropertyArrowChunkReader& reader) { + int idx = 0, sum = 0; + std::shared_ptr table; + + do { + auto result = reader.GetChunk(); + REQUIRE(!result.has_error()); + table = result.value(); + std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() + << "/" << chunk_size << '\n'; + idx++; + sum += table->num_rows(); + } while (!reader.next_chunk().IsIndexError()); + REQUIRE(table->num_columns() == (int) expected_cols.size()); + + std::cout << "Total Nums: " << sum << "/" << idx * chunk_size << '\n'; + std::cout << "Column Nums: " << table->num_columns() << "\n"; + std::cout << "Column Names: "; + for (int i = 0; i < table->num_columns(); i++) { + REQUIRE(table->ColumnNames()[i] == expected_cols[i]); + std::cout << "`" << table->ColumnNames()[i] << "` "; + } + std::cout << "\n\n"; + }; + + SECTION("pushdown by helper function") { + std::cout << "Adj list property pushdown by helper function: \n"; + auto maybe_reader = GAR_NAMESPACE::ConstructAdjListPropertyArrowChunkReader( + graph_info, src_label, edge_label, dst_label, group, + GAR_NAMESPACE::AdjListType::ordered_by_source, options); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + walkReader(reader); + } + + SECTION("pushdown by function Filter() & Select()") { + std::cout << "Adj list property pushdown by Filter() & Select():" + << std::endl; + auto maybe_reader = GAR_NAMESPACE::ConstructAdjListPropertyArrowChunkReader( + graph_info, src_label, edge_label, dst_label, group, + GAR_NAMESPACE::AdjListType::ordered_by_source); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + reader.Filter(filter); + reader.Select(expected_cols); + walkReader(reader); + } +} + TEST_CASE("test_read_adj_list_offset_chunk_example") { std::string root; REQUIRE(GetTestResourceRoot(&root).ok()); diff --git a/docs/reference/api-reference-cpp.rst b/docs/reference/api-reference-cpp.rst index 8066772c8..5ee9c4796 100644 --- a/docs/reference/api-reference-cpp.rst +++ b/docs/reference/api-reference-cpp.rst @@ -220,3 +220,73 @@ Info Version .. doxygenclass:: GraphArchive::InfoVersion :members: :undoc-members: + +Expression +~~~~~~~~~~~~~~~~~~~ + +.. doxygenclass:: GraphArchive::Expression + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionProperty + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionLiteral + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionNot + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionIsNull + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionEqual + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionNotEqual + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionGreaterThan + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionGreaterEqual + :members: + :undoc-members: + + +.. doxygenclass:: GraphArchive::ExpressionLessThan + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionLessEqual + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionAnd + :members: + :undoc-members: + +.. doxygenclass:: GraphArchive::ExpressionOr + :members: + :undoc-members: + +.. doxygenfunction:: GraphArchive::_Property +.. doxygenfunction:: GraphArchive::_Literal +.. doxygenfunction:: GraphArchive::_Not +.. doxygenfunction:: GraphArchive::_IsNull +.. doxygenfunction:: GraphArchive::_Equal +.. doxygenfunction:: GraphArchive::_NotEqual +.. doxygenfunction:: GraphArchive::_GreaterThan +.. doxygenfunction:: GraphArchive::_GreaterEqual +.. doxygenfunction:: GraphArchive::_LessThan +.. doxygenfunction:: GraphArchive::_LessEqual +.. doxygenfunction:: GraphArchive::_And +.. doxygenfunction:: GraphArchive::_Or +