Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][C++] Revise the ArrowChunkReader constructors by remove redundant parameter #360

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cpp/include/gar/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,7 @@ class EdgeIter {
IdType global_chunk_index, IdType offset,
IdType chunk_begin, IdType chunk_end,
std::shared_ptr<util::IndexConverter> index_converter)
: adj_list_reader_(
edge_info, adj_list_type, prefix,
index_converter->GlobalChunkIndexToIndexPair(global_chunk_index)
.first),
: adj_list_reader_(edge_info, adj_list_type, prefix),
global_chunk_index_(global_chunk_index),
cur_offset_(offset),
chunk_size_(edge_info->GetChunkSize()),
Expand All @@ -322,10 +319,11 @@ class EdgeIter {
index_converter_(index_converter) {
vertex_chunk_index_ =
index_converter->GlobalChunkIndexToIndexPair(global_chunk_index).first;
adj_list_reader_.seek_chunk_index(vertex_chunk_index_);
const auto& property_groups = edge_info->GetPropertyGroups();
for (const auto& pg : property_groups) {
property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix,
vertex_chunk_index_);
property_readers_.emplace_back(edge_info, pg, adj_list_type, prefix),
property_readers_.back().seek_chunk_index(vertex_chunk_index_);
}
if (adj_list_type == AdjListType::ordered_by_source ||
adj_list_type == AdjListType::ordered_by_dest) {
Expand Down
16 changes: 9 additions & 7 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ class VertexPropertyArrowChunkReader {
VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, IdType chunk_index = 0,
const util::FilterOptions& options = {});
const std::string& prefix, const util::FilterOptions& options = {});

/**
* @brief Sets chunk position indicator for reader by internal vertex id.
Expand Down Expand Up @@ -161,11 +160,9 @@ class AdjListArrowChunkReader {
* @param edge_info The edge info that describes the edge type.
* @param adj_list_type The adj list type for the edge.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
*/
AdjListArrowChunkReader(const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type, const std::string& prefix,
IdType vertex_chunk_index = 0);
AdjListType adj_list_type, const std::string& prefix);

/**
* @brief Copy constructor.
Expand Down Expand Up @@ -249,6 +246,9 @@ class AdjListArrowChunkReader {
const std::string& src_label, const std::string& edge_label,
const std::string& dst_label, AdjListType adj_list_type);

private:
Status initOrUpdateEdgeChunkNum();

private:
std::shared_ptr<EdgeInfo> edge_info_;
AdjListType adj_list_type_;
Expand Down Expand Up @@ -359,13 +359,12 @@ class AdjListPropertyArrowChunkReader {
* group.
* @param adj_list_type The adj list type for the edges.
* @param prefix The absolute prefix.
* @param vertex_chunk_index The vertex chunk index, default is 0.
*/
AdjListPropertyArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info,
const std::shared_ptr<PropertyGroup>& property_group,
AdjListType adj_list_type, const std::string prefix,
IdType vertex_chunk_index = 0, const util::FilterOptions& options = {});
const util::FilterOptions& options = {});

/**
* @brief Copy constructor.
Expand Down Expand Up @@ -489,6 +488,9 @@ class AdjListPropertyArrowChunkReader {
const std::string& dst_label, const std::string& property_name,
AdjListType adj_list_type, const util::FilterOptions& options = {});

private:
Status initOrUpdateEdgeChunkNum();

private:
std::shared_ptr<EdgeInfo> edge_info_;
std::shared_ptr<PropertyGroup> property_group_;
Expand Down
107 changes: 59 additions & 48 deletions cpp/src/arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ namespace GAR_NAMESPACE_INTERNAL {
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, IdType chunk_index,
const util::FilterOptions& options)
const std::string& prefix, const util::FilterOptions& options)
: vertex_info_(std::move(vertex_info)),
property_group_(std::move(property_group)),
chunk_index_(chunk_index),
seek_id_(chunk_index * vertex_info->GetChunkSize()),
chunk_index_(0),
seek_id_(0),
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
Expand Down Expand Up @@ -108,7 +107,7 @@ VertexPropertyArrowChunkReader::Make(
const std::shared_ptr<PropertyGroup>& property_group,
const std::string& prefix, const util::FilterOptions& options) {
return std::make_shared<VertexPropertyArrowChunkReader>(
vertex_info, property_group, prefix, 0, options);
vertex_info, property_group, prefix, options);
}

Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
Expand Down Expand Up @@ -145,24 +144,22 @@ VertexPropertyArrowChunkReader::Make(

AdjListArrowChunkReader::AdjListArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix, IdType vertex_chunk_index)
const std::string& prefix)
: edge_info_(edge_info),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr) {
chunk_table_(nullptr),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
edge_info->GetAdjListPathPrefix(adj_list_type));
base_dir_ = prefix_ + adj_list_path_prefix;
GAR_ASSIGN_OR_RAISE_ERROR(
vertex_chunk_num_,
util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
}

AdjListArrowChunkReader::AdjListArrowChunkReader(
Expand Down Expand Up @@ -194,11 +191,10 @@ Status AdjListArrowChunkReader::seek_src(IdType id) {
edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand Down Expand Up @@ -228,11 +224,10 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) {
edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand All @@ -253,6 +248,10 @@ Status AdjListArrowChunkReader::seek(IdType offset) {
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
}
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
if (chunk_index_ >= chunk_num_) {
return Status::IndexError("The edge offset ", offset,
" is out of range [0,",
Expand All @@ -277,6 +276,10 @@ Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {

Status AdjListArrowChunkReader::next_chunk() {
++chunk_index_;
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
while (chunk_index_ >= chunk_num_) {
++vertex_chunk_index_;
if (vertex_chunk_index_ >= vertex_chunk_num_) {
Expand All @@ -285,9 +288,7 @@ Status AdjListArrowChunkReader::next_chunk() {
vertex_chunk_num_);
}
chunk_index_ = 0;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
Expand All @@ -296,11 +297,9 @@ Status AdjListArrowChunkReader::next_chunk() {

Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index,
IdType chunk_index) {
if (vertex_chunk_index_ != vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}
if (chunk_index_ != chunk_index) {
Expand Down Expand Up @@ -347,6 +346,13 @@ Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
}

Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
GAR_ASSIGN_OR_RAISE(chunk_num_,
util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
return Status::OK();
}

AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix)
Expand Down Expand Up @@ -452,16 +458,17 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info,
const std::shared_ptr<PropertyGroup>& property_group,
AdjListType adj_list_type, const std::string prefix,
IdType vertex_chunk_index, const util::FilterOptions& options)
const util::FilterOptions& options)
: edge_info_(std::move(edge_info)),
property_group_(std::move(property_group)),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(vertex_chunk_index),
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr),
filter_options_(options) {
filter_options_(options),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(
auto pg_path_prefix,
Expand All @@ -470,9 +477,6 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
GAR_ASSIGN_OR_RAISE_ERROR(
vertex_chunk_num_,
util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
}

AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
Expand Down Expand Up @@ -506,11 +510,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) {
edge_info_->GetSrcChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand Down Expand Up @@ -540,11 +542,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) {
edge_info_->GetDstChunkSize() * vertex_chunk_num_, ") of edge ",
edge_info_->GetEdgeLabel(), " reader.");
}
if (vertex_chunk_index_ != new_vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_ASSIGN_OR_RAISE(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}

Expand All @@ -565,6 +565,10 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) {
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
}
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
if (chunk_index_ >= chunk_num_) {
return Status::IndexError("The edge offset ", offset,
" is out of range [0,",
Expand Down Expand Up @@ -593,6 +597,10 @@ AdjListPropertyArrowChunkReader::GetChunk() {

Status AdjListPropertyArrowChunkReader::next_chunk() {
++chunk_index_;
if (chunk_num_ < 0) {
// initialize chunk_num_
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
while (chunk_index_ >= chunk_num_) {
++vertex_chunk_index_;
if (vertex_chunk_index_ >= vertex_chunk_num_) {
Expand All @@ -604,9 +612,7 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {
property_group_, ".");
}
chunk_index_ = 0;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
Expand All @@ -615,11 +621,9 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {

Status AdjListPropertyArrowChunkReader::seek_chunk_index(
IdType vertex_chunk_index, IdType chunk_index) {
if (vertex_chunk_index_ != vertex_chunk_index) {
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_ASSIGN_OR_RAISE_ERROR(
chunk_num_, util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
}
if (chunk_index_ != chunk_index) {
Expand Down Expand Up @@ -650,7 +654,7 @@ AdjListPropertyArrowChunkReader::Make(
" doesn't exist in edge ", edge_info->GetEdgeLabel(), ".");
}
return std::make_shared<AdjListPropertyArrowChunkReader>(
edge_info, property_group, adj_list_type, prefix, 0, options);
edge_info, property_group, adj_list_type, prefix, options);
}

Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
Expand Down Expand Up @@ -689,4 +693,11 @@ AdjListPropertyArrowChunkReader::Make(
options);
}

Status AdjListPropertyArrowChunkReader::initOrUpdateEdgeChunkNum() {
GAR_ASSIGN_OR_RAISE(chunk_num_,
util::GetEdgeChunkNum(prefix_, edge_info_, adj_list_type_,
vertex_chunk_index_));
return Status::OK();
}

} // namespace GAR_NAMESPACE_INTERNAL
Loading
Loading