diff --git a/docs/api-reference.rst b/docs/api-reference.rst index d79e64097..f4e3322cd 100644 --- a/docs/api-reference.rst +++ b/docs/api-reference.rst @@ -81,6 +81,10 @@ Vertices Collection :members: :undoc-members: +.. doxygenclass:: GraphArchive::VertexIter + :members: + :undoc-members: + .. doxygenclass:: GraphArchive::VerticesCollection :members: :undoc-members: diff --git a/include/gar/graph.h b/include/gar/graph.h index 24438384e..1ec1c0850 100644 --- a/include/gar/graph.h +++ b/include/gar/graph.h @@ -24,12 +24,15 @@ limitations under the License. #include #include -#include "arrow/api.h" - #include "gar/reader/arrow_chunk_reader.h" #include "gar/utils/reader_utils.h" #include "gar/utils/utils.h" +// forward declarations +namespace arrow { +class ChunkedArray; +} + namespace GAR_NAMESPACE_INTERNAL { /** @@ -133,6 +136,90 @@ class Edge { std::map properties_; }; +/** + * @brief The iterator for traversing a type of vertices. + * + */ +class VertexIter { + public: + /** + * Initialize the iterator. + * + * @param vertex_info The vertex info that describes the vertex type. + * @param prefix The absolute prefix. + * @param offset The current offset of the readers. + */ + explicit VertexIter(const VertexInfo& vertex_info, const std::string& prefix, + IdType offset) noexcept { + for (const auto& pg : vertex_info.GetPropertyGroups()) { + readers_.emplace_back(vertex_info, pg, prefix); + } + cur_offset_ = offset; + } + + /// Copy constructor. + VertexIter(const VertexIter& other) + : readers_(other.readers_), cur_offset_(other.cur_offset_) {} + + /// Construct and return the vertex of the current offset. + Vertex operator*() noexcept { + for (auto& reader : readers_) { + reader.seek(cur_offset_); + } + return Vertex(cur_offset_, readers_); + } + + /// Get the vertex id of the current offset. + IdType id() { return cur_offset_; } + + /// Get the value for a property of the current vertex. + template + Result property(const std::string& property) noexcept { + std::shared_ptr column(nullptr); + for (auto& reader : readers_) { + reader.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); + column = util::GetArrowColumnByName(chunk_table, property); + if (column != nullptr) { + break; + } + } + if (column != nullptr) { + auto array = util::GetArrowArrayByChunkIndex(column, 0); + GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array)); + return util::ValueGetter::Value(data, 0); + } + return Status::KeyError("The property is not exist."); + } + + /// The prefix increment operator. + VertexIter& operator++() noexcept { + ++cur_offset_; + return *this; + } + + /// The postfix increment operator. + VertexIter operator++(int) { + VertexIter ret(*this); + ++cur_offset_; + return ret; + } + + /// The equality operator. + bool operator==(const VertexIter& rhs) const noexcept { + return cur_offset_ == rhs.cur_offset_; + } + + /// The inequality operator. + bool operator!=(const VertexIter& rhs) const noexcept { + return cur_offset_ != rhs.cur_offset_; + } + + private: + std::vector readers_; + IdType cur_offset_; +}; + /** * @brief VerticesCollection is designed for reading a collection of vertices. * @@ -159,100 +246,16 @@ class VerticesCollection { fs->ReadFileToValue(vertex_num_path)); } - /** - * @brief The iterator for traversing a type of vertices. - * - */ - class iterator { - public: - /** - * Initialize the iterator. - * - * @param vertex_info The vertex info that describes the vertex type. - * @param prefix The absolute prefix. - * @param offset The current offset of the readers. - */ - explicit iterator(const VertexInfo& vertex_info, const std::string& prefix, - IdType offset) noexcept { - for (const auto& pg : vertex_info.GetPropertyGroups()) { - readers_.emplace_back(vertex_info, pg, prefix); - } - cur_offset_ = offset; - } - - /// Copy constructor. - iterator(const iterator& other) - : readers_(other.readers_), cur_offset_(other.cur_offset_) {} - - /// Construct and return the vertex of the current offset. - Vertex operator*() noexcept { - for (auto& reader : readers_) { - reader.seek(cur_offset_); - } - return Vertex(cur_offset_, readers_); - } - - /// Get the vertex id of the current offset. - IdType id() { return cur_offset_; } - - /// Get the value for a property of the current vertex. - template - Result property(const std::string& property) noexcept { - std::shared_ptr column(nullptr); - for (auto& reader : readers_) { - reader.seek(cur_offset_); - GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); - column = chunk_table->GetColumnByName(property); - if (column != nullptr) { - break; - } - } - if (column != nullptr) { - GAR_ASSIGN_OR_RAISE(auto data, - util::GetArrowArrayData(column->chunk(0))); - return util::ValueGetter::Value(data, 0); - } - return Status::KeyError("The property is not exist."); - } - - /// The prefix increment operator. - iterator& operator++() noexcept { - ++cur_offset_; - return *this; - } - - /// The postfix increment operator. - iterator operator++(int) { - iterator ret(*this); - ++cur_offset_; - return ret; - } - - /// The equality operator. - bool operator==(const iterator& rhs) const noexcept { - return cur_offset_ == rhs.cur_offset_; - } - - /// The inequality operator. - bool operator!=(const iterator& rhs) const noexcept { - return cur_offset_ != rhs.cur_offset_; - } - - private: - std::vector readers_; - IdType cur_offset_; - }; - /// The iterator pointing to the first vertex. - iterator begin() noexcept { return iterator(vertex_info_, prefix_, 0); } + VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); } /// The iterator pointing to the past-the-end element. - iterator end() noexcept { - return iterator(vertex_info_, prefix_, vertex_num_); + VertexIter end() noexcept { + return VertexIter(vertex_info_, prefix_, vertex_num_); } /// The iterator pointing to the vertex with specific id. - iterator find(IdType id) { return iterator(vertex_info_, prefix_, id); } + VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); } /// Get the number of vertices in the collection. size_t size() const noexcept { return vertex_num_; } @@ -356,22 +359,10 @@ class EdgeIter { } /// Get the source vertex id for the current edge. - IdType source() { - adj_list_reader_.seek(cur_offset_); - GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk()); - auto src_column = chunk->column(0); - return std::dynamic_pointer_cast(src_column->chunk(0)) - ->GetView(0); - } + IdType source(); /// Get the destination vertex id for the current edge. - IdType destination() { - adj_list_reader_.seek(cur_offset_); - GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk()); - auto src_column = chunk->column(1); - return std::dynamic_pointer_cast(src_column->chunk(0)) - ->GetView(0); - } + IdType destination(); /// Get the value of a property for the current edge. template @@ -380,13 +371,14 @@ class EdgeIter { for (auto& reader : property_readers_) { reader.seek(cur_offset_); GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk()); - column = chunk_table->GetColumnByName(property); + column = util::GetArrowColumnByName(chunk_table, property); if (column != nullptr) { break; } } if (column != nullptr) { - GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(column->chunk(0))); + auto array = util::GetArrowArrayByChunkIndex(column, 0); + GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array)); return util::ValueGetter::Value(data, 0); } return Status::KeyError("The property is not exist."); @@ -488,128 +480,7 @@ class EdgeIter { * @param id The vertex id. * @return If such edge is found or not. */ - bool first_src(const EdgeIter& from, IdType id) { - if (from.is_end()) - return false; - - // ordered_by_dest or unordered_by_dest - if (adj_list_type_ == AdjListType::ordered_by_dest || - adj_list_type_ == AdjListType::unordered_by_dest) { - if (from.global_chunk_index_ > chunk_end_ || - (from.global_chunk_index_ == chunk_end_ && - from.cur_offset_ > offset_of_chunk_end_)) { - return false; - } - if (from.global_chunk_index_ == global_chunk_index_) { - cur_offset_ = from.cur_offset_; - } else if (from.global_chunk_index_ < chunk_begin_ || - (from.global_chunk_index_ == chunk_begin_ && - from.cur_offset_ < offset_of_chunk_begin_)) { - this->to_begin(); - } else { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - this->refresh(); - } - while (!this->is_end()) { - if (this->source() == id) - return true; - this->operator++(); - } - return false; - } - - // unordered_by_source - if (adj_list_type_ == AdjListType::unordered_by_source) { - IdType expect_chunk_index = index_converter_->IndexPairToGlobalChunkIndex( - id / src_chunk_size_, 0); - if (expect_chunk_index > chunk_end_) - return false; - if (from.global_chunk_index_ > chunk_end_ || - (from.global_chunk_index_ == chunk_end_ && - from.cur_offset_ > offset_of_chunk_end_)) { - return false; - } - bool need_refresh = false; - if (from.global_chunk_index_ == global_chunk_index_) { - cur_offset_ = from.cur_offset_; - } else if (from.global_chunk_index_ < chunk_begin_ || - (from.global_chunk_index_ == chunk_begin_ && - from.cur_offset_ < offset_of_chunk_begin_)) { - this->to_begin(); - } else { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - need_refresh = true; - } - if (global_chunk_index_ < expect_chunk_index) { - global_chunk_index_ = expect_chunk_index; - cur_offset_ = 0; - vertex_chunk_index_ = id / src_chunk_size_; - need_refresh = true; - } - if (need_refresh) - this->refresh(); - while (!this->is_end()) { - if (this->source() == id) - return true; - if (vertex_chunk_index_ > id / src_chunk_size_) - return false; - this->operator++(); - } - return false; - } - - // ordered_by_source - auto st = offset_reader_->seek(id); - if (!st.ok()) { - return false; - } - auto maybe_offset_chunk = offset_reader_->GetChunk(); - if (!maybe_offset_chunk.status().ok()) { - return false; - } - auto offset_array = std::dynamic_pointer_cast( - maybe_offset_chunk.value()); - auto begin_offset = static_cast(offset_array->Value(0)); - auto end_offset = static_cast(offset_array->Value(1)); - if (begin_offset >= end_offset) { - return false; - } - auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex(); - auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex( - vertex_chunk_index_of_id, begin_offset / chunk_size_); - auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex( - vertex_chunk_index_of_id, end_offset / chunk_size_); - if (begin_global_index <= from.global_chunk_index_ && - from.global_chunk_index_ <= end_global_index) { - if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - refresh(); - return true; - } else if (from.cur_offset_ <= begin_offset) { - global_chunk_index_ = begin_global_index; - cur_offset_ = begin_offset; - vertex_chunk_index_ = vertex_chunk_index_of_id; - refresh(); - return true; - } else { - return false; - } - } else if (from.global_chunk_index_ < begin_global_index) { - global_chunk_index_ = begin_global_index; - cur_offset_ = begin_offset; - vertex_chunk_index_ = vertex_chunk_index_of_id; - refresh(); - return true; - } else { - return false; - } - } + bool first_src(const EdgeIter& from, IdType id); /** * Let the input iterator to point to the first incoming edge of the @@ -619,128 +490,7 @@ class EdgeIter { * @param id The vertex id. * @return If such edge is found or not. */ - bool first_dst(const EdgeIter& from, IdType id) { - if (from.is_end()) - return false; - - // ordered_by_source or unordered_by_source - if (adj_list_type_ == AdjListType::ordered_by_source || - adj_list_type_ == AdjListType::unordered_by_source) { - if (from.global_chunk_index_ > chunk_end_ || - (from.global_chunk_index_ == chunk_end_ && - from.cur_offset_ > offset_of_chunk_end_)) { - return false; - } - if (from.global_chunk_index_ == global_chunk_index_) { - cur_offset_ = from.cur_offset_; - } else if (from.global_chunk_index_ < chunk_begin_ || - (from.global_chunk_index_ == chunk_begin_ && - from.cur_offset_ < offset_of_chunk_begin_)) { - this->to_begin(); - } else { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - this->refresh(); - } - while (!this->is_end()) { - if (this->destination() == id) - return true; - this->operator++(); - } - return false; - } - - // unordered_by_dest - if (adj_list_type_ == AdjListType::unordered_by_dest) { - IdType expect_chunk_index = index_converter_->IndexPairToGlobalChunkIndex( - id / dst_chunk_size_, 0); - if (expect_chunk_index > chunk_end_) - return false; - if (from.global_chunk_index_ > chunk_end_ || - (from.global_chunk_index_ == chunk_end_ && - from.cur_offset_ > offset_of_chunk_end_)) { - return false; - } - bool need_refresh = false; - if (from.global_chunk_index_ == global_chunk_index_) { - cur_offset_ = from.cur_offset_; - } else if (from.global_chunk_index_ < chunk_begin_ || - (from.global_chunk_index_ == chunk_begin_ && - from.cur_offset_ < offset_of_chunk_begin_)) { - this->to_begin(); - } else { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - need_refresh = true; - } - if (global_chunk_index_ < expect_chunk_index) { - global_chunk_index_ = expect_chunk_index; - cur_offset_ = 0; - vertex_chunk_index_ = id / dst_chunk_size_; - need_refresh = true; - } - if (need_refresh) - this->refresh(); - while (!this->is_end()) { - if (this->destination() == id) - return true; - if (vertex_chunk_index_ > id / dst_chunk_size_) - return false; - this->operator++(); - } - return false; - } - - // ordered_by_dest - auto st = offset_reader_->seek(id); - if (!st.ok()) { - return false; - } - auto maybe_offset_chunk = offset_reader_->GetChunk(); - if (!maybe_offset_chunk.status().ok()) { - return false; - } - auto offset_array = std::dynamic_pointer_cast( - maybe_offset_chunk.value()); - auto begin_offset = static_cast(offset_array->Value(0)); - auto end_offset = static_cast(offset_array->Value(1)); - if (begin_offset >= end_offset) { - return false; - } - auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex(); - auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex( - vertex_chunk_index_of_id, begin_offset / chunk_size_); - auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex( - vertex_chunk_index_of_id, end_offset / chunk_size_); - if (begin_global_index <= from.global_chunk_index_ && - from.global_chunk_index_ <= end_global_index) { - if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) { - global_chunk_index_ = from.global_chunk_index_; - cur_offset_ = from.cur_offset_; - vertex_chunk_index_ = from.vertex_chunk_index_; - refresh(); - return true; - } else if (from.cur_offset_ <= begin_offset) { - global_chunk_index_ = begin_global_index; - cur_offset_ = begin_offset; - vertex_chunk_index_ = vertex_chunk_index_of_id; - refresh(); - return true; - } else { - return false; - } - } else if (from.global_chunk_index_ < begin_global_index) { - global_chunk_index_ = begin_global_index; - cur_offset_ = begin_offset; - vertex_chunk_index_ = vertex_chunk_index_of_id; - refresh(); - return true; - } else { - return false; - } - } + bool first_dst(const EdgeIter& from, IdType id); /// Let the iterator to point to the begin. void to_begin() { diff --git a/include/gar/utils/utils.h b/include/gar/utils/utils.h index aa26fe412..6f2a0b57b 100644 --- a/include/gar/utils/utils.h +++ b/include/gar/utils/utils.h @@ -26,9 +26,12 @@ limitations under the License. #define REGULAR_SEPERATOR "_" +// forward declarations namespace arrow { +class Table; +class ChunkedArray; class Array; -} +} // namespace arrow namespace GAR_NAMESPACE_INTERNAL { @@ -92,6 +95,13 @@ static inline std::pair GlobalChunkIndexToIndexPair( return index_pair; } +std::shared_ptr GetArrowColumnByName( + std::shared_ptr const& table, const std::string& name); + +std::shared_ptr GetArrowArrayByChunkIndex( + std::shared_ptr const& chunk_array, + int64_t chunk_index); + Result GetArrowArrayData( std::shared_ptr const& array); diff --git a/src/graph.cc b/src/graph.cc index 9389f99a6..2e3523cfd 100644 --- a/src/graph.cc +++ b/src/graph.cc @@ -97,4 +97,267 @@ Edge::Edge( } } } + +IdType EdgeIter::source() { + adj_list_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk()); + auto src_column = chunk->column(0); + return std::dynamic_pointer_cast(src_column->chunk(0)) + ->GetView(0); +} + +IdType EdgeIter::destination() { + adj_list_reader_.seek(cur_offset_); + GAR_ASSIGN_OR_RAISE_ERROR(auto chunk, adj_list_reader_.GetChunk()); + auto src_column = chunk->column(1); + return std::dynamic_pointer_cast(src_column->chunk(0)) + ->GetView(0); +} + +bool EdgeIter::first_src(const EdgeIter& from, IdType id) { + if (from.is_end()) + return false; + + // ordered_by_dest or unordered_by_dest + if (adj_list_type_ == AdjListType::ordered_by_dest || + adj_list_type_ == AdjListType::unordered_by_dest) { + if (from.global_chunk_index_ > chunk_end_ || + (from.global_chunk_index_ == chunk_end_ && + from.cur_offset_ > offset_of_chunk_end_)) { + return false; + } + if (from.global_chunk_index_ == global_chunk_index_) { + cur_offset_ = from.cur_offset_; + } else if (from.global_chunk_index_ < chunk_begin_ || + (from.global_chunk_index_ == chunk_begin_ && + from.cur_offset_ < offset_of_chunk_begin_)) { + this->to_begin(); + } else { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + this->refresh(); + } + while (!this->is_end()) { + if (this->source() == id) + return true; + this->operator++(); + } + return false; + } + + // unordered_by_source + if (adj_list_type_ == AdjListType::unordered_by_source) { + IdType expect_chunk_index = + index_converter_->IndexPairToGlobalChunkIndex(id / src_chunk_size_, 0); + if (expect_chunk_index > chunk_end_) + return false; + if (from.global_chunk_index_ > chunk_end_ || + (from.global_chunk_index_ == chunk_end_ && + from.cur_offset_ > offset_of_chunk_end_)) { + return false; + } + bool need_refresh = false; + if (from.global_chunk_index_ == global_chunk_index_) { + cur_offset_ = from.cur_offset_; + } else if (from.global_chunk_index_ < chunk_begin_ || + (from.global_chunk_index_ == chunk_begin_ && + from.cur_offset_ < offset_of_chunk_begin_)) { + this->to_begin(); + } else { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + need_refresh = true; + } + if (global_chunk_index_ < expect_chunk_index) { + global_chunk_index_ = expect_chunk_index; + cur_offset_ = 0; + vertex_chunk_index_ = id / src_chunk_size_; + need_refresh = true; + } + if (need_refresh) + this->refresh(); + while (!this->is_end()) { + if (this->source() == id) + return true; + if (vertex_chunk_index_ > id / src_chunk_size_) + return false; + this->operator++(); + } + return false; + } + + // ordered_by_source + auto st = offset_reader_->seek(id); + if (!st.ok()) { + return false; + } + auto maybe_offset_chunk = offset_reader_->GetChunk(); + if (!maybe_offset_chunk.status().ok()) { + return false; + } + auto offset_array = + std::dynamic_pointer_cast(maybe_offset_chunk.value()); + auto begin_offset = static_cast(offset_array->Value(0)); + auto end_offset = static_cast(offset_array->Value(1)); + if (begin_offset >= end_offset) { + return false; + } + auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex(); + auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex( + vertex_chunk_index_of_id, begin_offset / chunk_size_); + auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex( + vertex_chunk_index_of_id, end_offset / chunk_size_); + if (begin_global_index <= from.global_chunk_index_ && + from.global_chunk_index_ <= end_global_index) { + if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + refresh(); + return true; + } else if (from.cur_offset_ <= begin_offset) { + global_chunk_index_ = begin_global_index; + cur_offset_ = begin_offset; + vertex_chunk_index_ = vertex_chunk_index_of_id; + refresh(); + return true; + } else { + return false; + } + } else if (from.global_chunk_index_ < begin_global_index) { + global_chunk_index_ = begin_global_index; + cur_offset_ = begin_offset; + vertex_chunk_index_ = vertex_chunk_index_of_id; + refresh(); + return true; + } else { + return false; + } +} + +bool EdgeIter::first_dst(const EdgeIter& from, IdType id) { + if (from.is_end()) + return false; + + // ordered_by_source or unordered_by_source + if (adj_list_type_ == AdjListType::ordered_by_source || + adj_list_type_ == AdjListType::unordered_by_source) { + if (from.global_chunk_index_ > chunk_end_ || + (from.global_chunk_index_ == chunk_end_ && + from.cur_offset_ > offset_of_chunk_end_)) { + return false; + } + if (from.global_chunk_index_ == global_chunk_index_) { + cur_offset_ = from.cur_offset_; + } else if (from.global_chunk_index_ < chunk_begin_ || + (from.global_chunk_index_ == chunk_begin_ && + from.cur_offset_ < offset_of_chunk_begin_)) { + this->to_begin(); + } else { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + this->refresh(); + } + while (!this->is_end()) { + if (this->destination() == id) + return true; + this->operator++(); + } + return false; + } + + // unordered_by_dest + if (adj_list_type_ == AdjListType::unordered_by_dest) { + IdType expect_chunk_index = + index_converter_->IndexPairToGlobalChunkIndex(id / dst_chunk_size_, 0); + if (expect_chunk_index > chunk_end_) + return false; + if (from.global_chunk_index_ > chunk_end_ || + (from.global_chunk_index_ == chunk_end_ && + from.cur_offset_ > offset_of_chunk_end_)) { + return false; + } + bool need_refresh = false; + if (from.global_chunk_index_ == global_chunk_index_) { + cur_offset_ = from.cur_offset_; + } else if (from.global_chunk_index_ < chunk_begin_ || + (from.global_chunk_index_ == chunk_begin_ && + from.cur_offset_ < offset_of_chunk_begin_)) { + this->to_begin(); + } else { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + need_refresh = true; + } + if (global_chunk_index_ < expect_chunk_index) { + global_chunk_index_ = expect_chunk_index; + cur_offset_ = 0; + vertex_chunk_index_ = id / dst_chunk_size_; + need_refresh = true; + } + if (need_refresh) + this->refresh(); + while (!this->is_end()) { + if (this->destination() == id) + return true; + if (vertex_chunk_index_ > id / dst_chunk_size_) + return false; + this->operator++(); + } + return false; + } + + // ordered_by_dest + auto st = offset_reader_->seek(id); + if (!st.ok()) { + return false; + } + auto maybe_offset_chunk = offset_reader_->GetChunk(); + if (!maybe_offset_chunk.status().ok()) { + return false; + } + auto offset_array = + std::dynamic_pointer_cast(maybe_offset_chunk.value()); + auto begin_offset = static_cast(offset_array->Value(0)); + auto end_offset = static_cast(offset_array->Value(1)); + if (begin_offset >= end_offset) { + return false; + } + auto vertex_chunk_index_of_id = offset_reader_->GetChunkIndex(); + auto begin_global_index = index_converter_->IndexPairToGlobalChunkIndex( + vertex_chunk_index_of_id, begin_offset / chunk_size_); + auto end_global_index = index_converter_->IndexPairToGlobalChunkIndex( + vertex_chunk_index_of_id, end_offset / chunk_size_); + if (begin_global_index <= from.global_chunk_index_ && + from.global_chunk_index_ <= end_global_index) { + if (begin_offset < from.cur_offset_ && from.cur_offset_ < end_offset) { + global_chunk_index_ = from.global_chunk_index_; + cur_offset_ = from.cur_offset_; + vertex_chunk_index_ = from.vertex_chunk_index_; + refresh(); + return true; + } else if (from.cur_offset_ <= begin_offset) { + global_chunk_index_ = begin_global_index; + cur_offset_ = begin_offset; + vertex_chunk_index_ = vertex_chunk_index_of_id; + refresh(); + return true; + } else { + return false; + } + } else if (from.global_chunk_index_ < begin_global_index) { + global_chunk_index_ = begin_global_index; + cur_offset_ = begin_offset; + vertex_chunk_index_ = vertex_chunk_index_of_id; + refresh(); + return true; + } else { + return false; + } +} + } // namespace GAR_NAMESPACE_INTERNAL diff --git a/src/utils.cc b/src/utils.cc index 69a9d9454..4194e2b7c 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -24,6 +24,17 @@ namespace GAR_NAMESPACE_INTERNAL { namespace util { +std::shared_ptr GetArrowColumnByName( + std::shared_ptr const& table, const std::string& name) { + return table->GetColumnByName(name); +} + +std::shared_ptr GetArrowArrayByChunkIndex( + std::shared_ptr const& chunk_array, + int64_t chunk_index) { + return chunk_array->chunk(chunk_index); +} + Result GetArrowArrayData( std::shared_ptr const& array) { if (array->type()->Equals(arrow::int8())) {