From 2d9106befed0b10a112a92259aeada8bae46cbf5 Mon Sep 17 00:00:00 2001 From: acezen Date: Tue, 6 Feb 2024 17:07:41 +0800 Subject: [PATCH 1/3] [Improve][C++] Revise the ArrowChunkReader construcors by remove redundant parameter --- cpp/include/gar/graph.h | 10 +- cpp/include/gar/reader/arrow_chunk_reader.h | 14 ++- cpp/src/arrow_chunk_reader.cc | 107 +++++++++++--------- 3 files changed, 72 insertions(+), 59 deletions(-) diff --git a/cpp/include/gar/graph.h b/cpp/include/gar/graph.h index e8d90b423..c18cc592a 100644 --- a/cpp/include/gar/graph.h +++ b/cpp/include/gar/graph.h @@ -306,10 +306,7 @@ class EdgeIter { IdType global_chunk_index, IdType offset, IdType chunk_begin, IdType chunk_end, std::shared_ptr index_converter) - : adj_list_reader_( - edge_info, adj_list_type, prefix, - index_converter->GlobalChunkIndexToIndexPair(global_chunk_index) - .first), + : adj_list_reader_(edge_info, adj_list_type, prefix), global_chunk_index_(global_chunk_index), cur_offset_(offset), chunk_size_(edge_info->GetChunkSize()), @@ -322,10 +319,11 @@ class EdgeIter { index_converter_(index_converter) { vertex_chunk_index_ = index_converter->GlobalChunkIndexToIndexPair(global_chunk_index).first; + adj_list_reader_.seek_chunk_index(vertex_chunk_index_); const auto& property_groups = edge_info->GetPropertyGroups(); for (const auto& pg : property_groups) { - property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix, - vertex_chunk_index_); + property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix), + property_readers_.back().seek_chunk_index(vertex_chunk_index_); } if (adj_list_type == AdjListType::ordered_by_source || adj_list_type == AdjListType::ordered_by_dest) { diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h b/cpp/include/gar/reader/arrow_chunk_reader.h index 697b025ab..d3b97608c 100644 --- a/cpp/include/gar/reader/arrow_chunk_reader.h +++ b/cpp/include/gar/reader/arrow_chunk_reader.h @@ -48,8 +48,7 @@ class VertexPropertyArrowChunkReader { VertexPropertyArrowChunkReader( const std::shared_ptr& vertex_info, const std::shared_ptr& property_group, - const std::string& prefix, IdType chunk_index = 0, - const util::FilterOptions& options = {}); + const std::string& prefix, const util::FilterOptions& options = {}); /** * @brief Sets chunk position indicator for reader by internal vertex id. @@ -164,8 +163,7 @@ class AdjListArrowChunkReader { * @param vertex_chunk_index The vertex chunk index, default is 0. */ AdjListArrowChunkReader(const std::shared_ptr& edge_info, - AdjListType adj_list_type, const std::string& prefix, - IdType vertex_chunk_index = 0); + AdjListType adj_list_type, const std::string& prefix); /** * @brief Copy constructor. @@ -249,6 +247,9 @@ class AdjListArrowChunkReader { const std::string& src_label, const std::string& edge_label, const std::string& dst_label, AdjListType adj_list_type); + private: + Status initOrUpdateEdgeChunkNum(); + private: std::shared_ptr edge_info_; AdjListType adj_list_type_; @@ -365,7 +366,7 @@ class AdjListPropertyArrowChunkReader { const std::shared_ptr& edge_info, const std::shared_ptr& property_group, AdjListType adj_list_type, const std::string prefix, - IdType vertex_chunk_index = 0, const util::FilterOptions& options = {}); + const util::FilterOptions& options = {}); /** * @brief Copy constructor. @@ -489,6 +490,9 @@ class AdjListPropertyArrowChunkReader { const std::string& dst_label, const std::string& property_name, AdjListType adj_list_type, const util::FilterOptions& options = {}); + private: + Status initOrUpdateEdgeChunkNum(); + private: std::shared_ptr edge_info_; std::shared_ptr property_group_; diff --git a/cpp/src/arrow_chunk_reader.cc b/cpp/src/arrow_chunk_reader.cc index e8fcd1c24..9183d57f4 100644 --- a/cpp/src/arrow_chunk_reader.cc +++ b/cpp/src/arrow_chunk_reader.cc @@ -31,12 +31,11 @@ namespace GAR_NAMESPACE_INTERNAL { VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( const std::shared_ptr& vertex_info, const std::shared_ptr& property_group, - const std::string& prefix, IdType chunk_index, - const util::FilterOptions& options) + const std::string& prefix, const util::FilterOptions& options) : vertex_info_(std::move(vertex_info)), property_group_(std::move(property_group)), - chunk_index_(chunk_index), - seek_id_(chunk_index * vertex_info->GetChunkSize()), + chunk_index_(0), + seek_id_(0), chunk_table_(nullptr), filter_options_(options) { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); @@ -108,7 +107,7 @@ VertexPropertyArrowChunkReader::Make( const std::shared_ptr& property_group, const std::string& prefix, const util::FilterOptions& options) { return std::make_shared( - vertex_info, property_group, prefix, 0, options); + vertex_info, property_group, prefix, options); } Result> @@ -145,14 +144,15 @@ VertexPropertyArrowChunkReader::Make( AdjListArrowChunkReader::AdjListArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix, IdType vertex_chunk_index) + const std::string& prefix) : edge_info_(edge_info), adj_list_type_(adj_list_type), prefix_(prefix), - vertex_chunk_index_(vertex_chunk_index), + vertex_chunk_index_(0), chunk_index_(0), seek_offset_(0), - chunk_table_(nullptr) { + chunk_table_(nullptr), + chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix, edge_info->GetAdjListPathPrefix(adj_list_type)); @@ -160,9 +160,6 @@ AdjListArrowChunkReader::AdjListArrowChunkReader( GAR_ASSIGN_OR_RAISE_ERROR( vertex_chunk_num_, util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_)); - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); } AdjListArrowChunkReader::AdjListArrowChunkReader( @@ -194,11 +191,10 @@ Status AdjListArrowChunkReader::seek_src(IdType id) { edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ", edge_info_->GetEdgeLabel(), " reader."); } - if (vertex_chunk_index_ != new_vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { + // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; - GAR_ASSIGN_OR_RAISE( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } @@ -228,11 +224,10 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) { edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ", edge_info_->GetEdgeLabel(), " reader."); } - if (vertex_chunk_index_ != new_vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { + // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; - GAR_ASSIGN_OR_RAISE( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } @@ -253,6 +248,10 @@ Status AdjListArrowChunkReader::seek(IdType offset) { if (chunk_index_ != pre_chunk_index) { chunk_table_.reset(); } + if (chunk_num_ < 0) { + // initialize chunk_num_ + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); + } if (chunk_index_ >= chunk_num_) { return Status::IndexError("The edge offset ", offset, " is out of range [0,", @@ -277,6 +276,10 @@ Result> AdjListArrowChunkReader::GetChunk() { Status AdjListArrowChunkReader::next_chunk() { ++chunk_index_; + if (chunk_num_ < 0) { + // initialize chunk_num_ + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); + } while (chunk_index_ >= chunk_num_) { ++vertex_chunk_index_; if (vertex_chunk_index_ >= vertex_chunk_num_) { @@ -285,9 +288,7 @@ Status AdjListArrowChunkReader::next_chunk() { vertex_chunk_num_); } chunk_index_ = 0; - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); chunk_table_.reset(); @@ -296,11 +297,9 @@ Status AdjListArrowChunkReader::next_chunk() { Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index) { - if (vertex_chunk_index_ != vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } if (chunk_index_ != chunk_index) { @@ -347,6 +346,13 @@ Result> AdjListArrowChunkReader::Make( return Make(edge_info, adj_list_type, graph_info->GetPrefix()); } +Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() { + GAR_ASSIGN_OR_RAISE(chunk_num_, + util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, + vertex_chunk_index_)); + return Status::OK(); +} + AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, const std::string& prefix) @@ -452,16 +458,17 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( const std::shared_ptr& edge_info, const std::shared_ptr& property_group, AdjListType adj_list_type, const std::string prefix, - IdType vertex_chunk_index, const util::FilterOptions& options) + const util::FilterOptions& options) : edge_info_(std::move(edge_info)), property_group_(std::move(property_group)), adj_list_type_(adj_list_type), prefix_(prefix), - vertex_chunk_index_(vertex_chunk_index), + vertex_chunk_index_(0), chunk_index_(0), seek_offset_(0), chunk_table_(nullptr), - filter_options_(options) { + filter_options_(options), + chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR( auto pg_path_prefix, @@ -470,9 +477,6 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( GAR_ASSIGN_OR_RAISE_ERROR( vertex_chunk_num_, util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_)); - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); } AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( @@ -506,11 +510,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) { edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ", edge_info_->GetEdgeLabel(), " reader."); } - if (vertex_chunk_index_ != new_vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; - GAR_ASSIGN_OR_RAISE( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } @@ -540,11 +542,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) { edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ", edge_info_->GetEdgeLabel(), " reader."); } - if (vertex_chunk_index_ != new_vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; - GAR_ASSIGN_OR_RAISE( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } @@ -565,6 +565,10 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) { if (chunk_index_ != pre_chunk_index) { chunk_table_.reset(); } + if (chunk_num_ < 0) { + // initialize chunk_num_ + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); + } if (chunk_index_ >= chunk_num_) { return Status::IndexError("The edge offset ", offset, " is out of range [0,", @@ -593,6 +597,10 @@ AdjListPropertyArrowChunkReader::GetChunk() { Status AdjListPropertyArrowChunkReader::next_chunk() { ++chunk_index_; + if (chunk_num_ < 0) { + // initialize chunk_num_ + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); + } while (chunk_index_ >= chunk_num_) { ++vertex_chunk_index_; if (vertex_chunk_index_ >= vertex_chunk_num_) { @@ -604,9 +612,7 @@ Status AdjListPropertyArrowChunkReader::next_chunk() { property_group_, "."); } chunk_index_ = 0; - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); chunk_table_.reset(); @@ -615,11 +621,9 @@ Status AdjListPropertyArrowChunkReader::next_chunk() { Status AdjListPropertyArrowChunkReader::seek_chunk_index( IdType vertex_chunk_index, IdType chunk_index) { - if (vertex_chunk_index_ != vertex_chunk_index) { + if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; - GAR_ASSIGN_OR_RAISE_ERROR( - chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, - vertex_chunk_index_)); + GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); chunk_table_.reset(); } if (chunk_index_ != chunk_index) { @@ -650,7 +654,7 @@ AdjListPropertyArrowChunkReader::Make( " doesn't exist in edge ", edge_info->GetEdgeLabel(), "."); } return std::make_shared( - edge_info, property_group, adj_list_type, prefix, 0, options); + edge_info, property_group, adj_list_type, prefix, options); } Result> @@ -689,4 +693,11 @@ AdjListPropertyArrowChunkReader::Make( options); } +Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() { + GAR_ASSIGN_OR_RAISE(chunk_num_, + util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_, + vertex_chunk_index_)); + return Status::OK(); +} + } // namespace GAR_NAMESPACE_INTERNAL From 56560c10762deb2a8ae3837f294789d8dcbae465 Mon Sep 17 00:00:00 2001 From: acezen Date: Tue, 6 Feb 2024 17:13:36 +0800 Subject: [PATCH 2/3] Update doc --- cpp/include/gar/reader/arrow_chunk_reader.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h b/cpp/include/gar/reader/arrow_chunk_reader.h index d3b97608c..730cf2228 100644 --- a/cpp/include/gar/reader/arrow_chunk_reader.h +++ b/cpp/include/gar/reader/arrow_chunk_reader.h @@ -160,7 +160,6 @@ class AdjListArrowChunkReader { * @param edge_info The edge info that describes the edge type. * @param adj_list_type The adj list type for the edge. * @param prefix The absolute prefix. - * @param vertex_chunk_index The vertex chunk index, default is 0. */ AdjListArrowChunkReader(const std::shared_ptr& edge_info, AdjListType adj_list_type, const std::string& prefix); @@ -360,7 +359,6 @@ class AdjListPropertyArrowChunkReader { * group. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix. - * @param vertex_chunk_index The vertex chunk index, default is 0. */ AdjListPropertyArrowChunkReader( const std::shared_ptr& edge_info, From 5cd58924609a41e2db5c33d3a323e10d0cdcd8da Mon Sep 17 00:00:00 2001 From: acezen Date: Tue, 6 Feb 2024 17:45:34 +0800 Subject: [PATCH 3/3] Add test case --- cpp/test/test_arrow_chunk_reader.cc | 37 +++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/cpp/test/test_arrow_chunk_reader.cc b/cpp/test/test_arrow_chunk_reader.cc index 930c5017f..3dc030e2d 100644 --- a/cpp/test/test_arrow_chunk_reader.cc +++ b/cpp/test/test_arrow_chunk_reader.cc @@ -254,6 +254,24 @@ TEST_CASE("ArrowChunkReader") { edge_info, AdjListType::ordered_by_source, graph_info->GetPrefix()); REQUIRE(maybe_reader.status().ok()); } + + SECTION("set start vertex chunk index by seek_chunk_index") { + auto maybe_reader = AdjListArrowChunkReader::Make( + graph_info, src_label, edge_label, dst_label, + AdjListType::ordered_by_source); + auto reader = maybe_reader.value(); + // check reader start from vertex chunk 0 + auto result = reader->GetChunk(); + REQUIRE(!result.has_error()); + auto table = result.value(); + REQUIRE(table->num_rows() == 667); + // set start vertex chunk index to 1 + reader->seek_chunk_index(1); + result = reader->GetChunk(); + REQUIRE(!result.has_error()); + table = result.value(); + REQUIRE(table->num_rows() == 644); + } } SECTION("AdjListPropertyArrowChunkReader") { @@ -364,6 +382,25 @@ TEST_CASE("ArrowChunkReader") { walkReader(reader); } } + + SECTION("set start vertex chunk index by seek_chunk_index") { + auto maybe_reader = AdjListPropertyArrowChunkReader::Make( + graph_info, src_label, edge_label, dst_label, edge_property_name, + GAR_NAMESPACE::AdjListType::ordered_by_source); + REQUIRE(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + // check reader start from vertex chunk 0 + auto result = reader->GetChunk(); + REQUIRE(!result.has_error()); + auto table = result.value(); + REQUIRE(table->num_rows() == 667); + // set start vertex chunk index to 1 + reader->seek_chunk_index(1); + result = reader->GetChunk(); + REQUIRE(!result.has_error()); + table = result.value(); + REQUIRE(table->num_rows() == 644); + } } SECTION("AdjListOffsetArrowChunkReader") {