Skip to content

Commit

Permalink
Refactor: wrap filter and columns into FilterOptions
Browse files Browse the repository at this point in the history
Signed-off-by: Ziy1-Tan <[email protected]>
  • Loading branch information
Ziy1-Tan committed Jun 17, 2023
1 parent a5c2341 commit 6e5370e
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 56 deletions.
35 changes: 22 additions & 13 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
#include <utility>
#include <vector>

#include "arrow/compute/api.h"
#include "gar/graph_info.h"
#include "gar/utils/data_type.h"
#include "gar/utils/filesystem.h"
Expand All @@ -41,6 +42,16 @@ class Expression;

namespace GAR_NAMESPACE_INTERNAL {

using Columns = std::vector<std::string>;
struct FilterOptions {
std::optional<arrow::compute::Expression> filter;
std::optional<Columns> columns;

FilterOptions() : filter(std::nullopt), columns(std::nullopt) {}
FilterOptions(arrow::compute::Expression f, Columns cols)
: filter(f), columns(cols) {}
};

/**
* @brief The arrow chunk reader for vertex property group.
*/
Expand All @@ -53,11 +64,11 @@ class VertexPropertyArrowChunkReader {
* @param property_group The property group that describes the property group.
* @param prefix The absolute prefix.
*/
VertexPropertyArrowChunkReader(
const VertexInfo& vertex_info, const PropertyGroup& property_group,
const std::string& prefix, IdType chunk_index = 0,
std::shared_ptr<arrow::compute::Expression> filter = nullptr,
std::optional<std::vector<std::string>> columns = std::nullopt);
VertexPropertyArrowChunkReader(const VertexInfo& vertex_info,
const PropertyGroup& property_group,
const std::string& prefix,
IdType chunk_index = 0,
const FilterOptions& opts = {});

/**
* @brief Sets chunk position indicator for reader by vertex id.
Expand Down Expand Up @@ -114,10 +125,10 @@ class VertexPropertyArrowChunkReader {
*/
IdType GetChunkNum() const noexcept { return chunk_num_; }

void Filter(std::shared_ptr<arrow::compute::Expression> filter);
void Filter(arrow::compute::Expression filter);
void ClearFilter();

void Project(std::vector<std::string> columns);
void Project(Columns columns);
void ClearProjection();

private:
Expand All @@ -128,8 +139,7 @@ class VertexPropertyArrowChunkReader {
IdType seek_id_;
IdType chunk_num_;
std::shared_ptr<arrow::Table> chunk_table_;
std::shared_ptr<arrow::compute::Expression> filter_;
std::optional<std::vector<std::string>> columns_;
FilterOptions filter_options_;
std::shared_ptr<FileSystem> fs_;
};

Expand Down Expand Up @@ -565,12 +575,11 @@ static inline Result<VertexPropertyArrowChunkReader>
ConstructVertexPropertyArrowChunkReader(
const GraphInfo& graph_info, const std::string& label,
const PropertyGroup& property_group,
std::shared_ptr<arrow::compute::Expression> filter = nullptr,
std::optional<std::vector<std::string>> columns = std::nullopt) noexcept {
const FilterOptions& opts = {}) noexcept {
VertexInfo vertex_info;
GAR_ASSIGN_OR_RAISE(vertex_info, graph_info.GetVertexInfo(label));
return VertexPropertyArrowChunkReader(
vertex_info, property_group, graph_info.GetPrefix(), 0, filter, columns);
return VertexPropertyArrowChunkReader(vertex_info, property_group,
graph_info.GetPrefix(), 0, opts);
}

/**
Expand Down
10 changes: 7 additions & 3 deletions cpp/include/gar/utils/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ limitations under the License.
#include "gar/utils/status.h"
#include "gar/utils/utils.h"

#include "arrow/compute/api.h"
#include "arrow/dataset/api.h"

// forward declarations
namespace arrow {
class Buffer;
Expand All @@ -46,6 +49,8 @@ class RandomAccessFile;

namespace GAR_NAMESPACE_INTERNAL {

struct FilterOptions;

/**
* This class wraps an arrow::fs::FileSystem and provides methods for
* reading and writing arrow::Table objects from and to files, as well as
Expand Down Expand Up @@ -78,14 +83,13 @@ class FileSystem {
*
* @param path The path of the file to read.
* @param file_type The type of the file to read.
* @param filters The predictors to apply to the file.
* @param opts Filter condition and columns to be read
* @return A Result containing a std::shared_ptr to an arrow::Table if
* successful, or an error Status if unsuccessful.
*/
Result<std::shared_ptr<arrow::Table>> ReadAndFilterFileToTable(
const std::string& path, FileType file_type,
std::shared_ptr<arrow::compute::Expression> filter,
std::optional<std::vector<std::string>> columns) const noexcept;
const FilterOptions& opts) const noexcept;

/**
* @brief Read a file and convert its bytes to a value of type T.
Expand Down
28 changes: 9 additions & 19 deletions cpp/src/arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,18 @@ limitations under the License.

#include "gar/reader/arrow_chunk_reader.h"
#include "gar/utils/reader_utils.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/compute/expression.h"
#else
#include "arrow/compute/exec/expression.h"
#endif

namespace GAR_NAMESPACE_INTERNAL {
namespace cp = arrow::compute;
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
const VertexInfo& vertex_info, const PropertyGroup& property_group,
const std::string& prefix, IdType chunk_index,
std::shared_ptr<cp::Expression> filter,
std::optional<std::vector<std::string>> columns)
const std::string& prefix, IdType chunk_index, const FilterOptions& opts)
: vertex_info_(vertex_info),
property_group_(property_group),
chunk_index_(chunk_index),
seek_id_(chunk_index * vertex_info.GetChunkSize()),
chunk_table_(nullptr),
filter_(filter ? filter
: std::make_shared<cp::Expression>(cp::literal(true))),
columns_(columns) {
filter_options_(opts) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
vertex_info.GetPathPrefix(property_group));
Expand All @@ -57,7 +48,7 @@ VertexPropertyArrowChunkReader::GetChunk() noexcept {
std::string path = prefix_ + chunk_file_path;
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadAndFilterFileToTable(
path, property_group_.GetFileType(),
filter_, columns_));
filter_options_));
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_.GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand All @@ -73,21 +64,20 @@ VertexPropertyArrowChunkReader::GetRange() noexcept {
seek_id_ + chunk_table_->num_rows() - row_offset);
}

void VertexPropertyArrowChunkReader::Filter(
std::shared_ptr<cp::Expression> filter) {
filter_ = filter;
void VertexPropertyArrowChunkReader::Filter(cp::Expression filter) {
filter_options_.filter = filter;
}

void VertexPropertyArrowChunkReader::ClearFilter() {
filter_ = std::make_shared<cp::Expression>(cp::literal(true));
filter_options_.filter = {};
}

void VertexPropertyArrowChunkReader::Project(std::vector<std::string> columns) {
columns_ = std::optional<std::vector<std::string>>(columns);
void VertexPropertyArrowChunkReader::Project(Columns columns) {
filter_options_.columns = columns;
}

void VertexPropertyArrowChunkReader::ClearProjection() {
columns_ = std::nullopt;
filter_options_.columns = {};
}

Status AdjListArrowChunkReader::seek_src(IdType id) noexcept {
Expand Down
17 changes: 7 additions & 10 deletions cpp/src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ limitations under the License.
#include "arrow/util/uri.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/compute/expression.h"
#else
#include "arrow/compute/exec/expression.h"
#endif
#include "arrow/dataset/api.h"

#include "gar/reader/arrow_chunk_reader.h"
#include "gar/utils/filesystem.h"

namespace GAR_NAMESPACE_INTERNAL {
namespace cp = arrow::compute;
namespace ds = arrow::dataset;
namespace detail {
template <typename U, typename T>
Expand Down Expand Up @@ -176,8 +171,7 @@ Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(

Result<std::shared_ptr<arrow::Table>> FileSystem::ReadAndFilterFileToTable(
const std::string& path, FileType file_type,
std::shared_ptr<cp::Expression> filter,
std::optional<std::vector<std::string>> columns) const noexcept {
const FilterOptions& opts) const noexcept {
std::shared_ptr<ds::FileFormat> format = GetFileFormat(file_type);
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
Expand All @@ -186,7 +180,10 @@ Result<std::shared_ptr<arrow::Table>> FileSystem::ReadAndFilterFileToTable(
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto dataset, factory->Finish());
// Read specified columns with a row filter
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto scan_builder, dataset->NewScan());
RETURN_NOT_ARROW_OK(scan_builder->Filter(*filter));
auto&& [filter, columns] = opts;
if (filter) {
RETURN_NOT_ARROW_OK(scan_builder->Filter(filter.value()));
}
if (columns) {
RETURN_NOT_ARROW_OK(scan_builder->Project(columns.value()));
}
Expand Down
20 changes: 9 additions & 11 deletions cpp/test/test_arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ TEST_CASE("test_vertex_property_pushdown") {
auto group = maybe_group.value();

// pushdown options
auto filter = std::make_shared<cp::Expression>(
cp::equal(cp::field_ref("gender"), cp::literal("female")));
std::vector<std::string> column_names = {"firstName", "lastName"};
auto filter = cp::equal(cp::field_ref("gender"), cp::literal("female"));
std::vector<std::string> columns = {"firstName", "lastName"};

auto walkReader = [&](GAR_NAMESPACE::VertexPropertyArrowChunkReader& reader) {
int i = 0;
Expand All @@ -124,7 +123,7 @@ TEST_CASE("test_vertex_property_pushdown") {
auto [l, r] = reader.GetRange().value();
auto table = result.value();
names = table->ColumnNames();
std::cout << "Chunk : " << i << ",\tNums: " << table->num_rows()
std::cout << "Chunk: " << i << ",\tNums: " << table->num_rows()
<< ",\tRange: [" << l << ", " << r << "]" << '\n';
i++;
sum += table->num_rows();
Expand All @@ -141,23 +140,22 @@ TEST_CASE("test_vertex_property_pushdown") {
std::cout << '\n';
};

SECTION("filter by helper function") {
std::cout << "filter by ConstructVertexPropertyArrowChunkReader():"
<< std::endl;
SECTION("pushdown by helper function") {
std::cout << "pushdown by helper function: \n";
auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader(
graph_info, label, group, filter, column_names);
graph_info, label, group, {filter, columns});
REQUIRE(maybe_reader.status().ok());
walkReader(maybe_reader.value());
}

SECTION("filter by function Filter()") {
std::cout << "\nfilter by Filter():" << std::endl;
SECTION("pushdown by function Filter()") {
std::cout << "\npushdown by Filter():" << std::endl;
auto maybe_reader = GAR_NAMESPACE::ConstructVertexPropertyArrowChunkReader(
graph_info, label, group);
REQUIRE(maybe_reader.status().ok());
auto reader = maybe_reader.value();
reader.Filter(filter);
reader.Project(column_names);
reader.Project(columns);
walkReader(reader);
}
}
Expand Down

0 comments on commit 6e5370e

Please sign in to comment.