Skip to content

Commit

Permalink
[Improve][C++] Revise the ArrowChunkReader constructors by remove red…
Browse files Browse the repository at this point in the history
…undant parameter (#360)

* [Improve][C++] Revise the ArrowChunkReader construcors by remove redundant parameter

* Update doc

* Add test case
  • Loading branch information
acezen authored Feb 6, 2024
1 parent 4d092b3 commit 229021a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 61 deletions.
10 changes: 4 additions & 6 deletions cpp/include/gar/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,7 @@ class EdgeIter {
IdType global_chunk_index, IdType offset,
IdType chunk_begin, IdType chunk_end,
std::shared_ptr<util::IndexConverter> index_converter)
: adj_list_reader_(
edge_info, adj_list_type, prefix,
index_converter->GlobalChunkIndexToIndexPair(global_chunk_index)
.first),
: adj_list_reader_(edge_info, adj_list_type, prefix),
global_chunk_index_(global_chunk_index),
cur_offset_(offset),
chunk_size_(edge_info->GetChunkSize()),
Expand All @@ -322,10 +319,11 @@ class EdgeIter {
index_converter_(index_converter) {
vertex_chunk_index_ =
index_converter->GlobalChunkIndexToIndexPair(global_chunk_index).first;
adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
const auto& property_groups = edge_info->GetPropertyGroups();
for (const auto& pg : property_groups) {
property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix,
vertex_chunk_index_);
property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix),
property_readers_.back().seek_chunk_index(vertex_chunk_index_);
}
if (adj_list_type == AdjListType::ordered_by_source ||
adj_list_type == AdjListType::ordered_by_dest) {
Expand Down
16 changes: 9 additions & 7 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ class VertexPropertyArrowChunkReader {
VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, IdType chunk_index = 0,
const util::FilterOptions& options = {});
const std::string& prefix, const util::FilterOptions& options = {});

/**
* @brief Sets chunk position indicator for reader by internal vertex id.
Expand Down Expand Up @@ -161,11 +160,9 @@ class AdjListArrowChunkReader {
* @param edge_info The edge info that describes the edge type.
* @param adj_list_type The adj list type for the edge.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
*/
AdjListArrowChunkReader(const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type, const std::string& prefix,
IdType vertex_chunk_index = 0);
AdjListType adj_list_type, const std::string& prefix);

/**
* @brief Copy constructor.
Expand Down Expand Up @@ -249,6 +246,9 @@ class AdjListArrowChunkReader {
const std::string& src_label, const std::string& edge_label,
const std::string& dst_label, AdjListType adj_list_type);

private:
Status initOrUpdateEdgeChunkNum();

private:
std::shared_ptr<EdgeInfo> edge_info_;
AdjListType adj_list_type_;
Expand Down Expand Up @@ -359,13 +359,12 @@ class AdjListPropertyArrowChunkReader {
* group.
* @param adj_list_type The adj list type for the edges.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
*/
AdjListPropertyArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info,
const std::shared_ptr<PropertyGroup>& property_group,
AdjListType adj_list_type, const std::string prefix,
IdType vertex_chunk_index = 0, const util::FilterOptions& options = {});
const util::FilterOptions& options = {});

/**
* @brief Copy constructor.
Expand Down Expand Up @@ -489,6 +488,9 @@ class AdjListPropertyArrowChunkReader {
const std::string& dst_label, const std::string& property_name,
AdjListType adj_list_type, const util::FilterOptions& options = {});

private:
Status initOrUpdateEdgeChunkNum();

private:
std::shared_ptr<EdgeInfo> edge_info_;
std::shared_ptr<PropertyGroup> property_group_;
Expand Down
107 changes: 59 additions & 48 deletions cpp/src/arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ namespace GAR_NAMESPACE_INTERNAL {
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, IdType chunk_index,
const util::FilterOptions& options)
const std::string& prefix, const util::FilterOptions& options)
: vertex_info_(std::move(vertex_info)),
property_group_(std::move(property_group)),
chunk_index_(chunk_index),
seek_id_(chunk_index * vertex_info->GetChunkSize()),
chunk_index_(0),
seek_id_(0),
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
Expand Down Expand Up @@ -108,7 +107,7 @@ VertexPropertyArrowChunkReader::Make(
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, const util::FilterOptions& options) {
return std::make_shared<VertexPropertyArrowChunkReader>(
vertex_info, property_group, prefix, 0, options);
vertex_info, property_group, prefix, options);
}

Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
Expand Down Expand Up @@ -145,24 +144,22 @@ VertexPropertyArrowChunkReader::Make(

AdjListArrowChunkReader::AdjListArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix, IdType vertex_chunk_index)
const std::string& prefix)
: edge_info_(edge_info),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr) {
chunk_table_(nullptr),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
edge_info->GetAdjListPathPrefix(adj_list_type));
base_dir_ = prefix_ + adj_list_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(
vertex_chunk_num_,
util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
}

AdjListArrowChunkReader::AdjListArrowChunkReader(
Expand Down Expand Up @@ -194,11 +191,10 @@ Status AdjListArrowChunkReader::seek_src(IdType id) {
edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand Down Expand Up @@ -228,11 +224,10 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) {
edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand All @@ -253,6 +248,10 @@ Status AdjListArrowChunkReader::seek(IdType offset) {
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
}
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
if (chunk_index_ >= chunk_num_) {
return Status::IndexError("The edge offset ", offset,
" is out of range [0,",
Expand All @@ -277,6 +276,10 @@ Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {

Status AdjListArrowChunkReader::next_chunk() {
++chunk_index_;
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
while (chunk_index_ >= chunk_num_) {
++vertex_chunk_index_;
if (vertex_chunk_index_ >= vertex_chunk_num_) {
Expand All @@ -285,9 +288,7 @@ Status AdjListArrowChunkReader::next_chunk() {
vertex_chunk_num_);
}
chunk_index_ = 0;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
Expand All @@ -296,11 +297,9 @@ Status AdjListArrowChunkReader::next_chunk() {

Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index,
IdType chunk_index) {
if (vertex_chunk_index_ != vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}
if (chunk_index_ != chunk_index) {
Expand Down Expand Up @@ -347,6 +346,13 @@ Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
}

Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
GAR_ASSIGN_OR_RAISE(chunk_num_,
util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
return Status::OK();
}

AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix)
Expand Down Expand Up @@ -452,16 +458,17 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info,
const std::shared_ptr<PropertyGroup>& property_group,
AdjListType adj_list_type, const std::string prefix,
IdType vertex_chunk_index, const util::FilterOptions& options)
const util::FilterOptions& options)
: edge_info_(std::move(edge_info)),
property_group_(std::move(property_group)),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr),
filter_options_(options) {
filter_options_(options),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
Expand All @@ -470,9 +477,6 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
GAR_ASSIGN_OR_RAISE_ERROR(
vertex_chunk_num_,
util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
}

AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
Expand Down Expand Up @@ -506,11 +510,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) {
edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand Down Expand Up @@ -540,11 +542,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) {
edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand All @@ -565,6 +565,10 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) {
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
}
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
if (chunk_index_ >= chunk_num_) {
return Status::IndexError("The edge offset ", offset,
" is out of range [0,",
Expand Down Expand Up @@ -593,6 +597,10 @@ AdjListPropertyArrowChunkReader::GetChunk() {

Status AdjListPropertyArrowChunkReader::next_chunk() {
++chunk_index_;
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
while (chunk_index_ >= chunk_num_) {
++vertex_chunk_index_;
if (vertex_chunk_index_ >= vertex_chunk_num_) {
Expand All @@ -604,9 +612,7 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {
property_group_, ".");
}
chunk_index_ = 0;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
Expand All @@ -615,11 +621,9 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {

Status AdjListPropertyArrowChunkReader::seek_chunk_index(
IdType vertex_chunk_index, IdType chunk_index) {
if (vertex_chunk_index_ != vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}
if (chunk_index_ != chunk_index) {
Expand Down Expand Up @@ -650,7 +654,7 @@ AdjListPropertyArrowChunkReader::Make(
" doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
}
return std::make_shared<AdjListPropertyArrowChunkReader>(
edge_info, property_group, adj_list_type, prefix, 0, options);
edge_info, property_group, adj_list_type, prefix, options);
}

Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
Expand Down Expand Up @@ -689,4 +693,11 @@ AdjListPropertyArrowChunkReader::Make(
options);
}

Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
GAR_ASSIGN_OR_RAISE(chunk_num_,
util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
return Status::OK();
}

} // namespace GAR_NAMESPACE_INTERNAL
Loading

0 comments on commit 229021a

Please sign in to comment.