diff --git a/src/graph/context/CMakeLists.txt b/src/graph/context/CMakeLists.txt index 69636e4f2c0..2cbcbe26d37 100644 --- a/src/graph/context/CMakeLists.txt +++ b/src/graph/context/CMakeLists.txt @@ -7,9 +7,12 @@ nebula_add_library( QueryContext.cpp QueryExpressionContext.cpp ExecutionContext.cpp - Iterator.cpp Result.cpp Symbols.cpp + iterator/GetNeighborsIter.cpp + iterator/Iterator.cpp + iterator/PropIter.cpp + iterator/SequentialIter.cpp iterator/GetNbrsRespDataSetIter.cpp ) diff --git a/src/graph/context/Iterator.h b/src/graph/context/Iterator.h index 1b4119f0a32..ae8b0deef59 100644 --- a/src/graph/context/Iterator.h +++ b/src/graph/context/Iterator.h @@ -6,591 +6,11 @@ #ifndef GRAPH_CONTEXT_ITERATOR_H_ #define GRAPH_CONTEXT_ITERATOR_H_ -#include - -#include - -#include "common/algorithm/ReservoirSampling.h" -#include "common/datatypes/DataSet.h" -#include "common/datatypes/List.h" -#include "common/datatypes/Value.h" -#include "parser/TraverseSentences.h" - -namespace nebula { -namespace graph { - -class Iterator { - public: - template - using RowsType = std::vector; - template - using RowsIter = typename RowsType::iterator; - - // Warning this will break the origin order of elements! - template - static RowsIter eraseBySwap(RowsType& rows, RowsIter i) { - DCHECK(!rows.empty()); - std::swap(rows.back(), *i); - rows.pop_back(); - return i; - } - - enum class Kind : uint8_t { - kDefault, - kGetNeighbors, - kSequential, - kProp, - }; - - Iterator(std::shared_ptr value, Kind kind, bool checkMemory = false) - : checkMemory_(checkMemory), kind_(kind), numRowsModN_(0), value_(value) {} - - virtual ~Iterator() = default; - - Kind kind() const { - return kind_; - } - - virtual std::unique_ptr copy() const = 0; - - virtual bool valid() const { - return !hitsSysMemoryHighWatermark(); - } - - virtual void next() = 0; - - // erase current iter - virtual void erase() = 0; - - // Warning this will break the origin order of elements! - virtual void unstableErase() = 0; - - // remain the select data in range - virtual void select(std::size_t offset, std::size_t count) = 0; - - // Sample the elements - virtual void sample(int64_t count) = 0; - - virtual const Row* row() const = 0; - - virtual Row moveRow() = 0; - - // erase range, no include last position, if last > size(), erase to the end - // position - virtual void eraseRange(size_t first, size_t last) = 0; - - // Reset iterator position to `pos' from begin. Must be sure that the `pos' - // position is lower than `size()' before resetting - void reset(size_t pos = 0) { - numRowsModN_ = 0; - doReset(pos); - } - - virtual void clear() = 0; - - void operator++() { - next(); - } - - virtual std::shared_ptr valuePtr() const { - return value_; - } - - virtual size_t size() const = 0; - - bool empty() const { - return size() == 0; - } - - bool isDefaultIter() const { - return kind_ == Kind::kDefault; - } - - bool isGetNeighborsIter() const { - return kind_ == Kind::kGetNeighbors; - } - - bool isSequentialIter() const { - return kind_ == Kind::kSequential; - } - - bool isPropIter() const { - return kind_ == Kind::kProp; - } - - // The derived class should rewrite get prop if the Value is kind of dataset. - virtual const Value& getColumn(const std::string& col) const = 0; - - virtual const Value& getColumn(int32_t index) const = 0; - - // Get index of the column in tuple - virtual StatusOr getColumnIndex(const std::string& col) const = 0; - - template - const Value& getColumnByIndex(int32_t index, Iter iter) const { - int32_t size = static_cast(iter->size()); - if ((index > 0 && index >= size) || (index < 0 && -index > size)) { - return Value::kNullBadType; - } - return iter->operator[]((size + index) % size); - } - - virtual const Value& getTagProp(const std::string&, const std::string&) const { - DLOG(FATAL) << "Shouldn't call the unimplemented method"; - return Value::kEmpty; - } - - virtual const Value& getEdgeProp(const std::string&, const std::string&) const { - DLOG(FATAL) << "Shouldn't call the unimplemented method"; - return Value::kEmpty; - } - - virtual Value getVertex(const std::string& name = "") { - UNUSED(name); - return Value(); - } - - virtual Value getEdge() const { - return Value(); - } - - bool checkMemory() const { - return checkMemory_; - } - void setCheckMemory(bool checkMemory) { - checkMemory_ = checkMemory; - } - - protected: - virtual void doReset(size_t pos) = 0; - bool hitsSysMemoryHighWatermark() const; - - bool checkMemory_{false}; - Kind kind_; - mutable int64_t numRowsModN_{0}; - std::shared_ptr value_; -}; - -class DefaultIter final : public Iterator { - public: - explicit DefaultIter(std::shared_ptr value, bool checkMemory = false) - : Iterator(value, Kind::kDefault, checkMemory) {} - - std::unique_ptr copy() const override { - return std::make_unique(*this); - } - - bool valid() const override { - return Iterator::valid() && !(counter_ > 0); - } - - void next() override { - numRowsModN_++; - counter_++; - } - - void erase() override { - counter_--; - } - - void unstableErase() override { - DLOG(ERROR) << "Unimplemented default iterator."; - counter_--; - } - - void eraseRange(size_t, size_t) override { - return; - } - - void select(std::size_t, std::size_t) override { - DLOG(FATAL) << "Unimplemented method for default iterator."; - } - - void sample(int64_t) override { - DLOG(FATAL) << "Unimplemented default iterator."; - } - - void clear() override { - reset(); - } - - size_t size() const override { - return 1; - } - - const Value& getColumn(const std::string& /* col */) const override { - DLOG(FATAL) << "This method should not be invoked"; - return Value::kEmpty; - } - - const Value& getColumn(int32_t) const override { - DLOG(FATAL) << "This method should not be invoked"; - return Value::kEmpty; - } - - StatusOr getColumnIndex(const std::string&) const override { - DLOG(FATAL) << "This method should not be invoked"; - return Status::Error("Unimplemented method"); - } - - const Row* row() const override { - DLOG(FATAL) << "This method should not be invoked"; - return nullptr; - } - - Row moveRow() override { - DLOG(FATAL) << "This method should not be invoked"; - return Row{}; - } - - private: - void doReset(size_t pos) override { - DCHECK((pos == 0 && size() == 0) || (pos < size())); - counter_ = pos; - } - - int64_t counter_{0}; -}; - -class GetNeighborsIter final : public Iterator { - public: - explicit GetNeighborsIter(std::shared_ptr value, bool checkMemory = false); - - std::unique_ptr copy() const override { - auto copy = std::make_unique(*this); - copy->reset(); - return copy; - } - - bool valid() const override; - - void next() override; - - void clear() override { - valid_ = false; - dsIndices_.clear(); - reset(); - } - - void erase() override; - - void unstableErase() override { - erase(); - } - - // erase [first, last) - void eraseRange(size_t first, size_t last) override { - for (std::size_t i = 0; valid() && i < last; ++i) { - if (i >= first || i < last) { - erase(); - } else { - next(); - } - } - doReset(0); - } - - void select(std::size_t offset, std::size_t count) override { - for (std::size_t i = 0; valid(); ++i) { - if (i < offset || i > (offset + count - 1)) { - erase(); - } else { - next(); - } - } - doReset(0); - } - - void sample(int64_t count) override; - - // num of edges - size_t size() const override; - - // num of vertices - size_t numRows() const; - - const Value& getColumn(const std::string& col) const override; - - const Value& getColumn(int32_t index) const override; - - StatusOr getColumnIndex(const std::string& col) const override; - - const Value& getTagProp(const std::string& tag, const std::string& prop) const override; - - const Value& getEdgeProp(const std::string& edge, const std::string& prop) const override; - - Value getVertex(const std::string& name = "") override; - - Value getEdge() const override; - - // getVertices and getEdges arg batch interface use for subgraph - // Its unique based on the plan - List getVertices(); - - // return start vids - std::vector vids(); - - // Its unique based on the GN interface dedup - List getEdges(); - - // only return currentEdge, not currentRow, for test - const Row* row() const override { - return currentEdge_; - } - - Row moveRow() override { - return std::move(*currentEdge_); - } - - private: - void doReset(size_t pos) override { - UNUSED(pos); - valid_ = false; - bitIdx_ = -1; - goToFirstEdge(); - } - - inline const std::string& currentEdgeName() const { - DCHECK(currentDs_->tagEdgeNameIndices.find(colIdx_) != currentDs_->tagEdgeNameIndices.end()); - return currentDs_->tagEdgeNameIndices.find(colIdx_)->second; - } - - bool colValid() { - return !noEdge_ && valid(); - } - - // move to next List of Edge data - void nextCol(); - - void clearEdges(); - - struct PropIndex { - size_t colIdx; - std::vector propList; - std::unordered_map propIndices; - }; - - struct DataSetIndex { - const DataSet* ds; - // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 | - // -> {_vid : 0, _stats : 1, _tag:t1:p1:p2 : 2, _edge:d1:p1:p2 : 3} - std::unordered_map colIndices; - // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 | - // -> {2 : t1, 3 : e1} - std::unordered_map tagEdgeNameIndices; - // _tag:t1:p1:p2 -> {t1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} - std::unordered_map tagPropsMap; - // _edge:e1:p1:p2 -> {e1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} - std::unordered_map edgePropsMap; - - int64_t colLowerBound{-1}; - int64_t colUpperBound{-1}; - }; - - Status processList(std::shared_ptr value); - - void goToFirstEdge(); - - StatusOr buildIndex(DataSetIndex* dsIndex); - - Status buildPropIndex(const std::string& props, - size_t columnId, - bool isEdge, - DataSetIndex* dsIndex); - - StatusOr makeDataSetIndex(const DataSet& ds); - - FRIEND_TEST(IteratorTest, TestHead); - - bool valid_{false}; - std::vector dsIndices_; - - std::vector::iterator currentDs_; - - std::vector::const_iterator currentRow_; - std::vector::const_iterator rowsUpperBound_; - - int64_t colIdx_{-1}; - const List* currentCol_{nullptr}; - - bool noEdge_{false}; - int64_t edgeIdx_{-1}; - int64_t edgeIdxUpperBound_{-1}; - const List* currentEdge_{nullptr}; - - boost::dynamic_bitset<> bitset_; - int64_t bitIdx_{-1}; - Value prevVertex_; -}; - -class SequentialIter : public Iterator { - public: - explicit SequentialIter(std::shared_ptr value, bool checkMemory = false); - explicit SequentialIter(const SequentialIter& iter); - - // Union multiple sequential iterators - explicit SequentialIter(std::vector> inputList); - // Union two sequential iterators. - SequentialIter(std::unique_ptr left, std::unique_ptr right); - - std::unique_ptr copy() const override { - auto copy = std::make_unique(*this); - copy->reset(); - return copy; - } - - bool valid() const override; - - void next() override; - - void erase() override; - - void unstableErase() override; - - void eraseRange(size_t first, size_t last) override; - - void select(std::size_t offset, std::size_t count) override { - auto size = this->size(); - if (size <= static_cast(offset)) { - clear(); - } else if (size > static_cast(offset + count)) { - eraseRange(0, offset); - eraseRange(count, size - offset); - } else if (size > static_cast(offset) && size <= static_cast(offset + count)) { - eraseRange(0, offset); - } - } - - void sample(int64_t count) override { - DCHECK_GE(count, 0); - algorithm::ReservoirSampling sampler(count); - for (auto& row : *rows_) { - sampler.sampling(std::move(row)); - } - *rows_ = std::move(sampler).samples(); - iter_ = rows_->begin(); - } - - void clear() override { - rows_->clear(); - reset(); - } - - std::vector::iterator begin() { - return CHECK_NOTNULL(rows_)->begin(); - } - - std::vector::iterator end() { - return CHECK_NOTNULL(rows_)->end(); - } - - const std::unordered_map& getColIndices() const { - return colIndices_; - } - - size_t size() const override { - return rows_->size(); - } - - const Value& getColumn(const std::string& col) const override; - - const Value& getColumn(int32_t index) const override; - - StatusOr getColumnIndex(const std::string& col) const override; - - Value getVertex(const std::string& name = "") override; - - Value getEdge() const override; - - Row moveRow() override { - return std::move(*iter_); - } - - const Row* row() const override { - return &*iter_; - } - - protected: - // Notice: We only use this interface when return results to client. - friend class DataCollectExecutor; - friend class AppendVerticesExecutor; - friend class TraverseExecutor; - friend class ShortestPathExecutor; - - void doReset(size_t pos) override; - - std::vector::iterator iter_; - std::vector* rows_{nullptr}; - - private: - void init(std::vector>&& iterators); - - std::unordered_map colIndices_; -}; - -class PropIter final : public SequentialIter { - public: - explicit PropIter(std::shared_ptr value, bool checkMemory = false); - explicit PropIter(const PropIter& iter); - - std::unique_ptr copy() const override { - auto copy = std::make_unique(*this); - copy->reset(); - return copy; - } - - const std::unordered_map& getColIndices() const { - return dsIndex_.colIndices; - } - - const Value& getColumn(const std::string& col) const override; - - const Value& getColumn(int32_t index) const override; - - StatusOr getColumnIndex(const std::string& col) const override; - - Value getVertex(const std::string& name = "") override; - - Value getEdge() const override; - - List getVertices(); - - List getEdges(); - - const Value& getProp(const std::string& name, const std::string& prop) const; - - const Value& getTagProp(const std::string& tag, const std::string& prop) const override { - return getProp(tag, prop); - } - - const Value& getEdgeProp(const std::string& edge, const std::string& prop) const override { - return getProp(edge, prop); - } - - private: - Status makeDataSetIndex(const DataSet& ds); - - Status buildPropIndex(const std::string& props, size_t columnIdx); - - struct DataSetIndex { - const DataSet* ds; - // vertex | _vid | tag1.prop1 | tag1.prop2 | tag2,prop1 | tag2,prop2 | ... - // |_vid : 0 | tag1.prop1 : 1 | tag1.prop2 : 2 | tag2.prop1 : 3 |... - // edge |_src | _type| _ranking | _dst | edge1.prop1 | edge1.prop2 |... - // |_src : 0 | _type : 1| _ranking : 2 | _dst : 3| edge1.prop1 : - // 4|... - std::unordered_map colIndices; - // {tag1 : {prop1 : 1, prop2 : 2} - // {edge1 : {prop1 : 4, prop2 : 5} - std::unordered_map> propsMap; - }; - - private: - DataSetIndex dsIndex_; -}; - -std::ostream& operator<<(std::ostream& os, Iterator::Kind kind); -} // namespace graph -} // namespace nebula +#include "graph/context/iterator/DefaultIter.h" +#include "graph/context/iterator/GetNbrsRespDataSetIter.h" +#include "graph/context/iterator/GetNeighborsIter.h" +#include "graph/context/iterator/Iterator.h" +#include "graph/context/iterator/PropIter.h" +#include "graph/context/iterator/SequentialIter.h" #endif // GRAPH_CONTEXT_ITERATOR_H_ diff --git a/src/graph/context/iterator/DefaultIter.h b/src/graph/context/iterator/DefaultIter.h new file mode 100644 index 00000000000..6844f614eae --- /dev/null +++ b/src/graph/context/iterator/DefaultIter.h @@ -0,0 +1,98 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_DEFAULTITER_H_ +#define GRAPH_CONTEXT_ITERATOR_DEFAULTITER_H_ + +#include "graph/context/iterator/Iterator.h" + +namespace nebula { +namespace graph { + +class DefaultIter final : public Iterator { + public: + explicit DefaultIter(std::shared_ptr value, bool checkMemory = false) + : Iterator(value, Kind::kDefault, checkMemory) {} + + std::unique_ptr copy() const override { + return std::make_unique(*this); + } + + bool valid() const override { + return Iterator::valid() && !(counter_ > 0); + } + + void next() override { + numRowsModN_++; + counter_++; + } + + void erase() override { + counter_--; + } + + void unstableErase() override { + DLOG(ERROR) << "Unimplemented default iterator."; + counter_--; + } + + void eraseRange(size_t, size_t) override { + return; + } + + void select(std::size_t, std::size_t) override { + DLOG(FATAL) << "Unimplemented method for default iterator."; + } + + void sample(int64_t) override { + DLOG(FATAL) << "Unimplemented default iterator."; + } + + void clear() override { + reset(); + } + + size_t size() const override { + return 1; + } + + const Value& getColumn(const std::string& /* col */) const override { + DLOG(FATAL) << "This method should not be invoked"; + return Value::kEmpty; + } + + const Value& getColumn(int32_t) const override { + DLOG(FATAL) << "This method should not be invoked"; + return Value::kEmpty; + } + + StatusOr getColumnIndex(const std::string&) const override { + DLOG(FATAL) << "This method should not be invoked"; + return Status::Error("Unimplemented method"); + } + + const Row* row() const override { + DLOG(FATAL) << "This method should not be invoked"; + return nullptr; + } + + Row moveRow() override { + DLOG(FATAL) << "This method should not be invoked"; + return Row{}; + } + + private: + void doReset(size_t pos) override { + DCHECK((pos == 0 && size() == 0) || (pos < size())); + counter_ = pos; + } + + int64_t counter_{0}; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_DEFAULTITER_H_ diff --git a/src/graph/context/Iterator.cpp b/src/graph/context/iterator/GetNeighborsIter.cpp similarity index 60% rename from src/graph/context/Iterator.cpp rename to src/graph/context/iterator/GetNeighborsIter.cpp index b195007d501..fabecd02794 100644 --- a/src/graph/context/Iterator.cpp +++ b/src/graph/context/iterator/GetNeighborsIter.cpp @@ -3,38 +3,16 @@ * This source code is licensed under Apache 2.0 License. */ -#include "graph/context/Iterator.h" - -#include +#include "graph/context/iterator/GetNeighborsIter.h" +#include "common/algorithm/ReservoirSampling.h" #include "common/datatypes/Edge.h" #include "common/datatypes/Vertex.h" -#include "common/memory/MemoryUtils.h" #include "graph/util/SchemaUtil.h" -#include "interface/gen-cpp2/common_types.h" - -DECLARE_int32(num_rows_to_check_memory); -DECLARE_double(system_memory_high_watermark_ratio); namespace nebula { namespace graph { -bool Iterator::hitsSysMemoryHighWatermark() const { - if (checkMemory_) { - if (numRowsModN_ >= FLAGS_num_rows_to_check_memory) { - numRowsModN_ -= FLAGS_num_rows_to_check_memory; - } - if (UNLIKELY(numRowsModN_ == 0)) { - if (memory::MemoryUtils::kHitMemoryHighWatermark.load()) { - throw std::runtime_error( - folly::sformat("Used memory hits the high watermark({}) of total system memory.", - FLAGS_system_memory_high_watermark_ratio)); - } - } - } - return false; -} - GetNeighborsIter::GetNeighborsIter(std::shared_ptr value, bool checkMemory) : Iterator(value, Kind::kGetNeighbors, checkMemory) { if (value == nullptr) { @@ -623,386 +601,5 @@ void GetNeighborsIter::clearEdges() { } } -SequentialIter::SequentialIter(const SequentialIter& iter) - : Iterator(iter.valuePtr(), Kind::kSequential) { - auto valuePtr = iter.valuePtr(); - auto& ds = valuePtr->mutableDataSet(); - iter_ = ds.rows.begin(); - rows_ = &ds.rows; - colIndices_ = iter.getColIndices(); -} - -SequentialIter::SequentialIter(std::shared_ptr value, bool checkMemory) - : Iterator(value, Kind::kSequential, checkMemory) { - DCHECK(value->isDataSet()); - auto& ds = value->mutableDataSet(); - iter_ = ds.rows.begin(); - rows_ = &ds.rows; - for (size_t i = 0; i < ds.colNames.size(); ++i) { - colIndices_.emplace(ds.colNames[i], i); - } -} - -SequentialIter::SequentialIter(std::unique_ptr left, std::unique_ptr right) - : Iterator(left->valuePtr(), Kind::kSequential) { - std::vector> iterators; - iterators.emplace_back(std::move(left)); - iterators.emplace_back(std::move(right)); - init(std::move(iterators)); -} - -SequentialIter::SequentialIter(std::vector> inputList) - : Iterator(inputList.front()->valuePtr(), Kind::kSequential, inputList.front()->checkMemory()) { - init(std::move(inputList)); -} - -void SequentialIter::init(std::vector>&& iterators) { - DCHECK(!iterators.empty()); - const auto& firstIter = iterators.front(); - DCHECK(firstIter->isSequentialIter()); - colIndices_ = static_cast(firstIter.get())->getColIndices(); - DataSet ds; - for (auto& iter : iterators) { - DCHECK(iter->isSequentialIter()); - auto inputIter = static_cast(iter.get()); - ds.rows.insert(ds.rows.end(), - std::make_move_iterator(inputIter->begin()), - std::make_move_iterator(inputIter->end())); - } - value_ = std::make_shared(std::move(ds)); - rows_ = &value_->mutableDataSet().rows; - iter_ = rows_->begin(); -} - -bool SequentialIter::valid() const { - return Iterator::valid() && iter_ < rows_->end(); -} - -void SequentialIter::next() { - if (valid()) { - ++numRowsModN_; - ++iter_; - } -} - -void SequentialIter::erase() { - iter_ = rows_->erase(iter_); -} - -void SequentialIter::unstableErase() { - std::swap(rows_->back(), *iter_); - rows_->pop_back(); -} - -void SequentialIter::eraseRange(size_t first, size_t last) { - if (first >= last || first >= size()) { - return; - } - if (last > size()) { - rows_->erase(rows_->begin() + first, rows_->end()); - } else { - rows_->erase(rows_->begin() + first, rows_->begin() + last); - } - reset(); -} - -void SequentialIter::doReset(size_t pos) { - DCHECK((pos == 0 && size() == 0) || (pos < size())); - iter_ = rows_->begin() + pos; -} - -const Value& SequentialIter::getColumn(const std::string& col) const { - if (!valid()) { - return Value::kNullValue; - } - auto& row = *iter_; - auto index = colIndices_.find(col); - if (index == colIndices_.end()) { - return Value::kNullValue; - } - - DCHECK_LT(index->second, row.values.size()) << "index: " << index->second << " row" << row; - return row.values[index->second]; -} - -const Value& SequentialIter::getColumn(int32_t index) const { - return getColumnByIndex(index, iter_); -} - -StatusOr SequentialIter::getColumnIndex(const std::string& col) const { - auto index = colIndices_.find(col); - if (index == colIndices_.end()) { - return Status::Error("Don't exist column `%s'.", col.c_str()); - } - return index->second; -} - -Value SequentialIter::getVertex(const std::string& name) { - return getColumn(name); -} - -Value SequentialIter::getEdge() const { - return getColumn("EDGE"); -} - -PropIter::PropIter(const PropIter& iter) : SequentialIter(iter) { - dsIndex_ = iter.dsIndex_; - kind_ = Kind::kProp; -} - -PropIter::PropIter(std::shared_ptr value, bool checkMemory) - : SequentialIter(value, checkMemory) { - DCHECK(value->isDataSet()); - auto& ds = value->getDataSet(); - auto status = makeDataSetIndex(ds); - if (UNLIKELY(!status.ok())) { - DLOG(FATAL) << status; - clear(); - return; - } - kind_ = Kind::kProp; -} - -Status PropIter::makeDataSetIndex(const DataSet& ds) { - dsIndex_.ds = &ds; - auto& colNames = ds.colNames; - for (size_t i = 0; i < colNames.size(); ++i) { - dsIndex_.colIndices.emplace(colNames[i], i); - auto& colName = colNames[i]; - if (colName.find(".") != std::string::npos) { - NG_RETURN_IF_ERROR(buildPropIndex(colName, i)); - } - } - return Status::OK(); -} - -Status PropIter::buildPropIndex(const std::string& props, size_t columnId) { - std::vector pieces; - folly::split(".", props, pieces); - if (UNLIKELY(pieces.size() != 2)) { - return Status::Error("Bad column name format: %s", props.c_str()); - } - std::string name = pieces[0]; - auto& propsMap = dsIndex_.propsMap; - if (propsMap.find(name) != propsMap.end()) { - propsMap[name].emplace(pieces[1], columnId); - } else { - std::unordered_map propIndices; - propIndices.emplace(pieces[1], columnId); - propsMap.emplace(name, std::move(propIndices)); - } - return Status::OK(); -} - -const Value& PropIter::getColumn(const std::string& col) const { - if (!valid()) { - return Value::kNullValue; - } - - auto index = dsIndex_.colIndices.find(col); - if (index == dsIndex_.colIndices.end()) { - return Value::kNullValue; - } - auto& row = *iter_; - DCHECK_LT(index->second, row.values.size()); - return row.values[index->second]; -} - -StatusOr PropIter::getColumnIndex(const std::string& col) const { - auto index = dsIndex_.colIndices.find(col); - if (index == dsIndex_.colIndices.end()) { - return Status::Error("Don't exist column `%s'.", col.c_str()); - } - return index->second; -} - -const Value& PropIter::getProp(const std::string& name, const std::string& prop) const { - if (!valid()) { - return Value::kNullValue; - } - auto& propsMap = dsIndex_.propsMap; - size_t colId = 0; - auto& row = *iter_; - if (name == "*") { - for (auto& index : propsMap) { - auto propIndex = index.second.find(prop); - if (propIndex == index.second.end()) { - continue; - } - colId = propIndex->second; - DCHECK_GT(row.size(), colId); - auto& val = row[colId]; - if (val.empty()) { - continue; - } else { - return val; - } - } - return Value::kNullValue; - } else { - auto index = propsMap.find(name); - if (index == propsMap.end()) { - return Value::kEmpty; - } - auto propIndex = index->second.find(prop); - if (propIndex == index->second.end()) { - return Value::kNullValue; - } - colId = propIndex->second; - DCHECK_GT(row.size(), colId); - return row[colId]; - } -} - -Value PropIter::getVertex(const std::string& name) { - UNUSED(name); - if (!valid()) { - return Value::kNullValue; - } - - auto vidVal = getColumn(nebula::kVid); - if (!SchemaUtil::isValidVid(vidVal)) { - return Value::kNullValue; - } - Vertex vertex; - vertex.vid = vidVal; - auto& tagPropsMap = dsIndex_.propsMap; - bool isVertexProps = true; - auto& row = *iter_; - // tagPropsMap -> > - for (auto& tagProp : tagPropsMap) { - // propIndex -> std::unordered_map - for (auto& propIndex : tagProp.second) { - if (row[propIndex.second].empty()) { - // Not current vertex's prop - isVertexProps = false; - break; - } - } - if (!isVertexProps) { - isVertexProps = true; - continue; - } - Tag tag; - tag.name = tagProp.first; - for (auto& propIndex : tagProp.second) { - if (propIndex.first == nebula::kTag) { // "_tag" - continue; - } else { - tag.props.emplace(propIndex.first, row[propIndex.second]); - } - } - vertex.tags.emplace_back(std::move(tag)); - } - return Value(std::move(vertex)); -} - -Value PropIter::getEdge() const { - if (!valid()) { - return Value::kNullValue; - } - Edge edge; - auto& edgePropsMap = dsIndex_.propsMap; - bool isEdgeProps = true; - auto& row = *iter_; - for (auto& edgeProp : edgePropsMap) { - for (auto& propIndex : edgeProp.second) { - if (row[propIndex.second].empty()) { - // Not current edge's prop - isEdgeProps = false; - break; - } - } - if (!isEdgeProps) { - isEdgeProps = true; - continue; - } - auto edgeName = edgeProp.first; - edge.name = edgeProp.first; - - auto type = getEdgeProp(edgeName, kType); - if (!type.isInt()) { - return Value::kNullBadType; - } - edge.type = type.getInt(); - - auto& srcVal = getEdgeProp(edgeName, kSrc); - if (!SchemaUtil::isValidVid(srcVal)) { - return Value::kNullBadType; - } - edge.src = srcVal; - - auto& dstVal = getEdgeProp(edgeName, kDst); - if (!SchemaUtil::isValidVid(dstVal)) { - return Value::kNullBadType; - } - edge.dst = dstVal; - - auto rank = getEdgeProp(edgeName, kRank); - if (!rank.isInt()) { - return Value::kNullBadType; - } - edge.ranking = rank.getInt(); - - for (auto& propIndex : edgeProp.second) { - if (propIndex.first == kSrc || propIndex.first == kDst || propIndex.first == kType || - propIndex.first == kRank) { - continue; - } - edge.props.emplace(propIndex.first, row[propIndex.second]); - } - return Value(std::move(edge)); - } - return Value::kNullValue; -} - -List PropIter::getVertices() { - DCHECK(iter_ == rows_->begin()); - List vertices; - for (; valid(); next()) { - vertices.values.emplace_back(getVertex()); - } - reset(); - return vertices; -} - -List PropIter::getEdges() { - DCHECK(iter_ == rows_->begin()); - List edges; - edges.values.reserve(size()); - for (; valid(); next()) { - auto edge = getEdge(); - if (edge.isEdge()) { - const_cast(edge.getEdge()).format(); - } - edges.values.emplace_back(std::move(edge)); - } - reset(); - return edges; -} - -const Value& PropIter::getColumn(int32_t index) const { - return getColumnByIndex(index, iter_); -} - -std::ostream& operator<<(std::ostream& os, Iterator::Kind kind) { - switch (kind) { - case Iterator::Kind::kDefault: - os << "default"; - break; - case Iterator::Kind::kSequential: - os << "sequential"; - break; - case Iterator::Kind::kGetNeighbors: - os << "get neighbors"; - break; - case Iterator::Kind::kProp: - os << "Prop"; - break; - } - os << " iterator"; - return os; -} - } // namespace graph } // namespace nebula diff --git a/src/graph/context/iterator/GetNeighborsIter.h b/src/graph/context/iterator/GetNeighborsIter.h new file mode 100644 index 00000000000..40c434b8d3c --- /dev/null +++ b/src/graph/context/iterator/GetNeighborsIter.h @@ -0,0 +1,193 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_GETNEIGHBORSITER_H_ +#define GRAPH_CONTEXT_ITERATOR_GETNEIGHBORSITER_H_ + +#include + +#include + +#include "graph/context/iterator/Iterator.h" + +namespace nebula { +namespace graph { + +class GetNeighborsIter final : public Iterator { + public: + explicit GetNeighborsIter(std::shared_ptr value, bool checkMemory = false); + + std::unique_ptr copy() const override { + auto copy = std::make_unique(*this); + copy->reset(); + return copy; + } + + bool valid() const override; + + void next() override; + + void clear() override { + valid_ = false; + dsIndices_.clear(); + reset(); + } + + void erase() override; + + void unstableErase() override { + erase(); + } + + // erase [first, last) + void eraseRange(size_t first, size_t last) override { + for (std::size_t i = 0; valid() && i < last; ++i) { + if (i >= first || i < last) { + erase(); + } else { + next(); + } + } + doReset(0); + } + + void select(std::size_t offset, std::size_t count) override { + for (std::size_t i = 0; valid(); ++i) { + if (i < offset || i > (offset + count - 1)) { + erase(); + } else { + next(); + } + } + doReset(0); + } + + void sample(int64_t count) override; + + // num of edges + size_t size() const override; + + // num of vertices + size_t numRows() const; + + const Value& getColumn(const std::string& col) const override; + + const Value& getColumn(int32_t index) const override; + + StatusOr getColumnIndex(const std::string& col) const override; + + const Value& getTagProp(const std::string& tag, const std::string& prop) const override; + + const Value& getEdgeProp(const std::string& edge, const std::string& prop) const override; + + Value getVertex(const std::string& name = "") override; + + Value getEdge() const override; + + // getVertices and getEdges arg batch interface use for subgraph + // Its unique based on the plan + List getVertices(); + + // return start vids + std::vector vids(); + + // Its unique based on the GN interface dedup + List getEdges(); + + // only return currentEdge, not currentRow, for test + const Row* row() const override { + return currentEdge_; + } + + Row moveRow() override { + return std::move(*currentEdge_); + } + + private: + void doReset(size_t pos) override { + UNUSED(pos); + valid_ = false; + bitIdx_ = -1; + goToFirstEdge(); + } + + // The current edge name has the direction symbol indicated by `-/+`, such as `-like` + inline const std::string& currentEdgeName() const { + DCHECK(currentDs_->tagEdgeNameIndices.find(colIdx_) != currentDs_->tagEdgeNameIndices.end()); + return currentDs_->tagEdgeNameIndices.find(colIdx_)->second; + } + + bool colValid() { + return !noEdge_ && valid(); + } + + // move to next List of Edge data + void nextCol(); + + void clearEdges(); + + struct PropIndex { + size_t colIdx; + std::vector propList; + std::unordered_map propIndices; + }; + + struct DataSetIndex { + const DataSet* ds; + // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 | + // -> {_vid : 0, _stats : 1, _tag:t1:p1:p2 : 2, _edge:d1:p1:p2 : 3} + std::unordered_map colIndices; + // | _vid | _stats | _tag:t1:p1:p2 | _edge:e1:p1:p2 | + // -> {2 : t1, 3 : e1} + std::unordered_map tagEdgeNameIndices; + // _tag:t1:p1:p2 -> {t1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} + std::unordered_map tagPropsMap; + // _edge:e1:p1:p2 -> {e1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} + std::unordered_map edgePropsMap; + + int64_t colLowerBound{-1}; + int64_t colUpperBound{-1}; + }; + + Status processList(std::shared_ptr value); + + void goToFirstEdge(); + + StatusOr buildIndex(DataSetIndex* dsIndex); + + Status buildPropIndex(const std::string& props, + size_t columnId, + bool isEdge, + DataSetIndex* dsIndex); + + StatusOr makeDataSetIndex(const DataSet& ds); + + FRIEND_TEST(IteratorTest, TestHead); + + bool valid_{false}; + std::vector dsIndices_; + + std::vector::iterator currentDs_; + + std::vector::const_iterator currentRow_; + std::vector::const_iterator rowsUpperBound_; + + int64_t colIdx_{-1}; + const List* currentCol_{nullptr}; + + bool noEdge_{false}; + int64_t edgeIdx_{-1}; + int64_t edgeIdxUpperBound_{-1}; + const List* currentEdge_{nullptr}; + + boost::dynamic_bitset<> bitset_; + int64_t bitIdx_{-1}; + Value prevVertex_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_GETNEIGHBORSITER_H_ diff --git a/src/graph/context/iterator/Iterator.cpp b/src/graph/context/iterator/Iterator.cpp new file mode 100644 index 00000000000..342939adc20 --- /dev/null +++ b/src/graph/context/iterator/Iterator.cpp @@ -0,0 +1,52 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/context/iterator/Iterator.h" + +#include "common/memory/MemoryUtils.h" + +DECLARE_int32(num_rows_to_check_memory); +DECLARE_double(system_memory_high_watermark_ratio); + +namespace nebula { +namespace graph { + +bool Iterator::hitsSysMemoryHighWatermark() const { + if (checkMemory_) { + if (numRowsModN_ >= FLAGS_num_rows_to_check_memory) { + numRowsModN_ -= FLAGS_num_rows_to_check_memory; + } + if (UNLIKELY(numRowsModN_ == 0)) { + if (memory::MemoryUtils::kHitMemoryHighWatermark.load()) { + throw std::runtime_error( + folly::sformat("Used memory hits the high watermark({}) of total system memory.", + FLAGS_system_memory_high_watermark_ratio)); + } + } + } + return false; +} + +std::ostream& operator<<(std::ostream& os, Iterator::Kind kind) { + switch (kind) { + case Iterator::Kind::kDefault: + os << "default"; + break; + case Iterator::Kind::kSequential: + os << "sequential"; + break; + case Iterator::Kind::kGetNeighbors: + os << "get neighbors"; + break; + case Iterator::Kind::kProp: + os << "Prop"; + break; + } + os << " iterator"; + return os; +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/context/iterator/Iterator.h b/src/graph/context/iterator/Iterator.h new file mode 100644 index 00000000000..f11a48766cc --- /dev/null +++ b/src/graph/context/iterator/Iterator.h @@ -0,0 +1,173 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_ITERATOR_H_ +#define GRAPH_CONTEXT_ITERATOR_ITERATOR_H_ + +#include "common/base/StatusOr.h" +#include "common/datatypes/DataSet.h" +#include "common/datatypes/Value.h" + +namespace nebula { +namespace graph { + +class Iterator { + public: + template + using RowsType = std::vector; + template + using RowsIter = typename RowsType::iterator; + + // Warning this will break the origin order of elements! + template + static RowsIter eraseBySwap(RowsType& rows, RowsIter i) { + DCHECK(!rows.empty()); + std::swap(rows.back(), *i); + rows.pop_back(); + return i; + } + + enum class Kind : uint8_t { + kDefault, + kGetNeighbors, + kSequential, + kProp, + }; + + Iterator(std::shared_ptr value, Kind kind, bool checkMemory = false) + : checkMemory_(checkMemory), kind_(kind), numRowsModN_(0), value_(value) {} + + virtual ~Iterator() = default; + + Kind kind() const { + return kind_; + } + + virtual std::unique_ptr copy() const = 0; + + virtual bool valid() const { + return !hitsSysMemoryHighWatermark(); + } + + virtual void next() = 0; + + // erase current iter + virtual void erase() = 0; + + // Warning this will break the origin order of elements! + virtual void unstableErase() = 0; + + // remain the select data in range + virtual void select(std::size_t offset, std::size_t count) = 0; + + // Sample the elements + virtual void sample(int64_t count) = 0; + + virtual const Row* row() const = 0; + + virtual Row moveRow() = 0; + + // erase range, no include last position, if last > size(), erase to the end + // position + virtual void eraseRange(size_t first, size_t last) = 0; + + // Reset iterator position to `pos' from begin. Must be sure that the `pos' + // position is lower than `size()' before resetting + void reset(size_t pos = 0) { + numRowsModN_ = 0; + doReset(pos); + } + + virtual void clear() = 0; + + void operator++() { + next(); + } + + virtual std::shared_ptr valuePtr() const { + return value_; + } + + virtual size_t size() const = 0; + + bool empty() const { + return size() == 0; + } + + bool isDefaultIter() const { + return kind_ == Kind::kDefault; + } + + bool isGetNeighborsIter() const { + return kind_ == Kind::kGetNeighbors; + } + + bool isSequentialIter() const { + return kind_ == Kind::kSequential; + } + + bool isPropIter() const { + return kind_ == Kind::kProp; + } + + // The derived class should rewrite get prop if the Value is kind of dataset. + virtual const Value& getColumn(const std::string& col) const = 0; + + virtual const Value& getColumn(int32_t index) const = 0; + + // Get index of the column in tuple + virtual StatusOr getColumnIndex(const std::string& col) const = 0; + + template + const Value& getColumnByIndex(int32_t index, Iter iter) const { + int32_t size = static_cast(iter->size()); + if ((index > 0 && index >= size) || (index < 0 && -index > size)) { + return Value::kNullBadType; + } + return iter->operator[]((size + index) % size); + } + + virtual const Value& getTagProp(const std::string&, const std::string&) const { + DLOG(FATAL) << "Shouldn't call the unimplemented method"; + return Value::kEmpty; + } + + virtual const Value& getEdgeProp(const std::string&, const std::string&) const { + DLOG(FATAL) << "Shouldn't call the unimplemented method"; + return Value::kEmpty; + } + + virtual Value getVertex(const std::string& name = "") { + UNUSED(name); + return Value(); + } + + virtual Value getEdge() const { + return Value(); + } + + bool checkMemory() const { + return checkMemory_; + } + void setCheckMemory(bool checkMemory) { + checkMemory_ = checkMemory; + } + + protected: + virtual void doReset(size_t pos) = 0; + bool hitsSysMemoryHighWatermark() const; + + bool checkMemory_{false}; + Kind kind_; + mutable int64_t numRowsModN_{0}; + std::shared_ptr value_; +}; + +std::ostream& operator<<(std::ostream& os, Iterator::Kind kind); + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_ITERATOR_H_ diff --git a/src/graph/context/iterator/PropIter.cpp b/src/graph/context/iterator/PropIter.cpp new file mode 100644 index 00000000000..5daa5807ee4 --- /dev/null +++ b/src/graph/context/iterator/PropIter.cpp @@ -0,0 +1,257 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/context/iterator/PropIter.h" + +#include "common/datatypes/Edge.h" +#include "common/datatypes/Vertex.h" +#include "graph/util/SchemaUtil.h" + +namespace nebula { +namespace graph { + +PropIter::PropIter(const PropIter& iter) : SequentialIter(iter) { + dsIndex_ = iter.dsIndex_; + kind_ = Kind::kProp; +} + +PropIter::PropIter(std::shared_ptr value, bool checkMemory) + : SequentialIter(value, checkMemory) { + DCHECK(value->isDataSet()); + auto& ds = value->getDataSet(); + auto status = makeDataSetIndex(ds); + if (UNLIKELY(!status.ok())) { + DLOG(FATAL) << status; + clear(); + return; + } + kind_ = Kind::kProp; +} + +Status PropIter::makeDataSetIndex(const DataSet& ds) { + dsIndex_.ds = &ds; + auto& colNames = ds.colNames; + for (size_t i = 0; i < colNames.size(); ++i) { + dsIndex_.colIndices.emplace(colNames[i], i); + auto& colName = colNames[i]; + if (colName.find(".") != std::string::npos) { + NG_RETURN_IF_ERROR(buildPropIndex(colName, i)); + } + } + return Status::OK(); +} + +Status PropIter::buildPropIndex(const std::string& props, size_t columnId) { + std::vector pieces; + folly::split(".", props, pieces); + if (UNLIKELY(pieces.size() != 2)) { + return Status::Error("Bad column name format: %s", props.c_str()); + } + std::string name = pieces[0]; + auto& propsMap = dsIndex_.propsMap; + if (propsMap.find(name) != propsMap.end()) { + propsMap[name].emplace(pieces[1], columnId); + } else { + std::unordered_map propIndices; + propIndices.emplace(pieces[1], columnId); + propsMap.emplace(name, std::move(propIndices)); + } + return Status::OK(); +} + +const Value& PropIter::getColumn(const std::string& col) const { + if (!valid()) { + return Value::kNullValue; + } + + auto index = dsIndex_.colIndices.find(col); + if (index == dsIndex_.colIndices.end()) { + return Value::kNullValue; + } + auto& row = *iter_; + DCHECK_LT(index->second, row.values.size()); + return row.values[index->second]; +} + +StatusOr PropIter::getColumnIndex(const std::string& col) const { + auto index = dsIndex_.colIndices.find(col); + if (index == dsIndex_.colIndices.end()) { + return Status::Error("Don't exist column `%s'.", col.c_str()); + } + return index->second; +} + +const Value& PropIter::getProp(const std::string& name, const std::string& prop) const { + if (!valid()) { + return Value::kNullValue; + } + auto& propsMap = dsIndex_.propsMap; + size_t colId = 0; + auto& row = *iter_; + if (name == "*") { + for (auto& index : propsMap) { + auto propIndex = index.second.find(prop); + if (propIndex == index.second.end()) { + continue; + } + colId = propIndex->second; + DCHECK_GT(row.size(), colId); + auto& val = row[colId]; + if (val.empty()) { + continue; + } else { + return val; + } + } + return Value::kNullValue; + } else { + auto index = propsMap.find(name); + if (index == propsMap.end()) { + return Value::kEmpty; + } + auto propIndex = index->second.find(prop); + if (propIndex == index->second.end()) { + return Value::kNullValue; + } + colId = propIndex->second; + DCHECK_GT(row.size(), colId); + return row[colId]; + } +} + +Value PropIter::getVertex(const std::string& name) { + UNUSED(name); + if (!valid()) { + return Value::kNullValue; + } + + auto vidVal = getColumn(nebula::kVid); + if (!SchemaUtil::isValidVid(vidVal)) { + return Value::kNullValue; + } + Vertex vertex; + vertex.vid = vidVal; + auto& tagPropsMap = dsIndex_.propsMap; + bool isVertexProps = true; + auto& row = *iter_; + // tagPropsMap -> > + for (auto& tagProp : tagPropsMap) { + // propIndex -> std::unordered_map + for (auto& propIndex : tagProp.second) { + if (row[propIndex.second].empty()) { + // Not current vertex's prop + isVertexProps = false; + break; + } + } + if (!isVertexProps) { + isVertexProps = true; + continue; + } + Tag tag; + tag.name = tagProp.first; + for (auto& propIndex : tagProp.second) { + if (propIndex.first == nebula::kTag) { // "_tag" + continue; + } else { + tag.props.emplace(propIndex.first, row[propIndex.second]); + } + } + vertex.tags.emplace_back(std::move(tag)); + } + return Value(std::move(vertex)); +} + +Value PropIter::getEdge() const { + if (!valid()) { + return Value::kNullValue; + } + Edge edge; + auto& edgePropsMap = dsIndex_.propsMap; + bool isEdgeProps = true; + auto& row = *iter_; + for (auto& edgeProp : edgePropsMap) { + for (auto& propIndex : edgeProp.second) { + if (row[propIndex.second].empty()) { + // Not current edge's prop + isEdgeProps = false; + break; + } + } + if (!isEdgeProps) { + isEdgeProps = true; + continue; + } + auto edgeName = edgeProp.first; + edge.name = edgeProp.first; + + auto type = getEdgeProp(edgeName, kType); + if (!type.isInt()) { + return Value::kNullBadType; + } + edge.type = type.getInt(); + + auto& srcVal = getEdgeProp(edgeName, kSrc); + if (!SchemaUtil::isValidVid(srcVal)) { + return Value::kNullBadType; + } + edge.src = srcVal; + + auto& dstVal = getEdgeProp(edgeName, kDst); + if (!SchemaUtil::isValidVid(dstVal)) { + return Value::kNullBadType; + } + edge.dst = dstVal; + + auto rank = getEdgeProp(edgeName, kRank); + if (!rank.isInt()) { + return Value::kNullBadType; + } + edge.ranking = rank.getInt(); + + for (auto& propIndex : edgeProp.second) { + if (propIndex.first == kSrc || propIndex.first == kDst || propIndex.first == kType || + propIndex.first == kRank) { + continue; + } + DCHECK_LT(propIndex.second, row.size()); + edge.props.emplace(propIndex.first, row[propIndex.second]); + } + return Value(std::move(edge)); + } + return Value::kNullValue; +} + +List PropIter::getVertices() { + DCHECK(iter_ == rows_->begin()); + List vertices; + for (; valid(); next()) { + vertices.values.emplace_back(getVertex()); + } + reset(); + return vertices; +} + +List PropIter::getEdges() { + DCHECK(iter_ == rows_->begin()); + List edges; + edges.values.reserve(size()); + for (; valid(); next()) { + auto edge = getEdge(); + if (edge.isEdge()) { + const_cast(edge.getEdge()).format(); + } + edges.values.emplace_back(std::move(edge)); + } + reset(); + return edges; +} + +const Value& PropIter::getColumn(int32_t index) const { + return getColumnByIndex(index, iter_); +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/context/iterator/PropIter.h b/src/graph/context/iterator/PropIter.h new file mode 100644 index 00000000000..618be2cecd9 --- /dev/null +++ b/src/graph/context/iterator/PropIter.h @@ -0,0 +1,78 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_PROPITER_H_ +#define GRAPH_CONTEXT_ITERATOR_PROPITER_H_ + +#include "graph/context/iterator/SequentialIter.h" + +namespace nebula { +namespace graph { + +class PropIter final : public SequentialIter { + public: + explicit PropIter(std::shared_ptr value, bool checkMemory = false); + explicit PropIter(const PropIter& iter); + + std::unique_ptr copy() const override { + auto copy = std::make_unique(*this); + copy->reset(); + return copy; + } + + const std::unordered_map& getColIndices() const { + return dsIndex_.colIndices; + } + + const Value& getColumn(const std::string& col) const override; + + const Value& getColumn(int32_t index) const override; + + StatusOr getColumnIndex(const std::string& col) const override; + + Value getVertex(const std::string& name = "") override; + + Value getEdge() const override; + + List getVertices(); + + List getEdges(); + + const Value& getProp(const std::string& name, const std::string& prop) const; + + const Value& getTagProp(const std::string& tag, const std::string& prop) const override { + return getProp(tag, prop); + } + + const Value& getEdgeProp(const std::string& edge, const std::string& prop) const override { + return getProp(edge, prop); + } + + private: + Status makeDataSetIndex(const DataSet& ds); + + Status buildPropIndex(const std::string& props, size_t columnIdx); + + struct DataSetIndex { + const DataSet* ds; + // vertex | _vid | tag1.prop1 | tag1.prop2 | tag2,prop1 | tag2,prop2 | ... + // |_vid : 0 | tag1.prop1 : 1 | tag1.prop2 : 2 | tag2.prop1 : 3 |... + // edge |_src | _type| _ranking | _dst | edge1.prop1 | edge1.prop2 |... + // |_src : 0 | _type : 1| _ranking : 2 | _dst : 3| edge1.prop1 : + // 4|... + std::unordered_map colIndices; + // {tag1 : {prop1 : 1, prop2 : 2} + // {edge1 : {prop1 : 4, prop2 : 5} + std::unordered_map> propsMap; + }; + + private: + DataSetIndex dsIndex_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_PROPITER_H_ diff --git a/src/graph/context/iterator/SequentialIter.cpp b/src/graph/context/iterator/SequentialIter.cpp new file mode 100644 index 00000000000..02e665c4219 --- /dev/null +++ b/src/graph/context/iterator/SequentialIter.cpp @@ -0,0 +1,148 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/context/iterator/SequentialIter.h" + +#include "common/algorithm/ReservoirSampling.h" +#include "common/datatypes/Edge.h" +#include "common/datatypes/Vertex.h" + +namespace nebula { +namespace graph { + +SequentialIter::SequentialIter(const SequentialIter& iter) + : Iterator(iter.valuePtr(), Kind::kSequential) { + auto valuePtr = iter.valuePtr(); + auto& ds = valuePtr->mutableDataSet(); + iter_ = ds.rows.begin(); + rows_ = &ds.rows; + colIndices_ = iter.getColIndices(); +} + +SequentialIter::SequentialIter(std::shared_ptr value, bool checkMemory) + : Iterator(value, Kind::kSequential, checkMemory) { + DCHECK(value->isDataSet()); + auto& ds = value->mutableDataSet(); + iter_ = ds.rows.begin(); + rows_ = &ds.rows; + for (size_t i = 0; i < ds.colNames.size(); ++i) { + colIndices_.emplace(ds.colNames[i], i); + } +} + +SequentialIter::SequentialIter(std::unique_ptr left, std::unique_ptr right) + : Iterator(left->valuePtr(), Kind::kSequential) { + std::vector> iterators; + iterators.emplace_back(std::move(left)); + iterators.emplace_back(std::move(right)); + init(std::move(iterators)); +} + +SequentialIter::SequentialIter(std::vector> inputList) + : Iterator(inputList.front()->valuePtr(), Kind::kSequential, inputList.front()->checkMemory()) { + init(std::move(inputList)); +} + +void SequentialIter::init(std::vector>&& iterators) { + DCHECK(!iterators.empty()); + const auto& firstIter = iterators.front(); + DCHECK(firstIter->isSequentialIter()); + colIndices_ = static_cast(firstIter.get())->getColIndices(); + DataSet ds; + for (auto& iter : iterators) { + DCHECK(iter->isSequentialIter()); + auto inputIter = static_cast(iter.get()); + ds.rows.insert(ds.rows.end(), + std::make_move_iterator(inputIter->begin()), + std::make_move_iterator(inputIter->end())); + } + value_ = std::make_shared(std::move(ds)); + rows_ = &value_->mutableDataSet().rows; + iter_ = rows_->begin(); +} + +bool SequentialIter::valid() const { + return Iterator::valid() && iter_ < rows_->end(); +} + +void SequentialIter::next() { + if (valid()) { + ++numRowsModN_; + ++iter_; + } +} + +void SequentialIter::erase() { + iter_ = rows_->erase(iter_); +} + +void SequentialIter::unstableErase() { + std::swap(rows_->back(), *iter_); + rows_->pop_back(); +} + +void SequentialIter::eraseRange(size_t first, size_t last) { + if (first >= last || first >= size()) { + return; + } + if (last > size()) { + rows_->erase(rows_->begin() + first, rows_->end()); + } else { + rows_->erase(rows_->begin() + first, rows_->begin() + last); + } + reset(); +} + +void SequentialIter::doReset(size_t pos) { + DCHECK((pos == 0 && size() == 0) || (pos < size())); + iter_ = rows_->begin() + pos; +} + +const Value& SequentialIter::getColumn(const std::string& col) const { + if (!valid()) { + return Value::kNullValue; + } + auto& row = *iter_; + auto index = colIndices_.find(col); + if (index == colIndices_.end()) { + return Value::kNullValue; + } + + DCHECK_LT(index->second, row.values.size()) << "index: " << index->second << " row" << row; + return row.values[index->second]; +} + +const Value& SequentialIter::getColumn(int32_t index) const { + return getColumnByIndex(index, iter_); +} + +StatusOr SequentialIter::getColumnIndex(const std::string& col) const { + auto index = colIndices_.find(col); + if (index == colIndices_.end()) { + return Status::Error("Don't exist column `%s'.", col.c_str()); + } + return index->second; +} + +Value SequentialIter::getVertex(const std::string& name) { + return getColumn(name); +} + +Value SequentialIter::getEdge() const { + return getColumn("EDGE"); +} + +void SequentialIter::sample(int64_t count) { + DCHECK_GE(count, 0); + algorithm::ReservoirSampling sampler(count); + for (auto& row : *rows_) { + sampler.sampling(std::move(row)); + } + *rows_ = std::move(sampler).samples(); + iter_ = rows_->begin(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/context/iterator/SequentialIter.h b/src/graph/context/iterator/SequentialIter.h new file mode 100644 index 00000000000..40c2f628197 --- /dev/null +++ b/src/graph/context/iterator/SequentialIter.h @@ -0,0 +1,114 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_SEQUENTIALITER_H_ +#define GRAPH_CONTEXT_ITERATOR_SEQUENTIALITER_H_ + +#include "graph/context/iterator/Iterator.h" + +namespace nebula { +namespace graph { + +class SequentialIter : public Iterator { + public: + explicit SequentialIter(std::shared_ptr value, bool checkMemory = false); + explicit SequentialIter(const SequentialIter& iter); + + // Union multiple sequential iterators + explicit SequentialIter(std::vector> inputList); + // Union two sequential iterators. + SequentialIter(std::unique_ptr left, std::unique_ptr right); + + std::unique_ptr copy() const override { + auto copy = std::make_unique(*this); + copy->reset(); + return copy; + } + + bool valid() const override; + + void next() override; + + void erase() override; + + void unstableErase() override; + + void eraseRange(size_t first, size_t last) override; + + void select(std::size_t offset, std::size_t count) override { + auto size = this->size(); + if (size <= static_cast(offset)) { + clear(); + } else if (size > static_cast(offset + count)) { + eraseRange(0, offset); + eraseRange(count, size - offset); + } else if (size > static_cast(offset) && size <= static_cast(offset + count)) { + eraseRange(0, offset); + } + } + + void sample(int64_t count) override; + + void clear() override { + rows_->clear(); + reset(); + } + + std::vector::iterator begin() { + return CHECK_NOTNULL(rows_)->begin(); + } + + std::vector::iterator end() { + return CHECK_NOTNULL(rows_)->end(); + } + + const std::unordered_map& getColIndices() const { + return colIndices_; + } + + size_t size() const override { + return rows_->size(); + } + + const Value& getColumn(const std::string& col) const override; + + const Value& getColumn(int32_t index) const override; + + StatusOr getColumnIndex(const std::string& col) const override; + + Value getVertex(const std::string& name = "") override; + + Value getEdge() const override; + + Row moveRow() override { + return std::move(*iter_); + } + + const Row* row() const override { + return &*iter_; + } + + protected: + // Notice: We only use this interface when return results to client. + friend class DataCollectExecutor; + friend class AppendVerticesExecutor; + friend class TraverseExecutor; + friend class ShortestPathExecutor; + + void doReset(size_t pos) override; + + std::vector::iterator iter_; + std::vector* rows_{nullptr}; + + private: + void init(std::vector>&& iterators); + + std::unordered_map colIndices_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_SEQUENTIALITER_H_ diff --git a/src/graph/gc/GC.cpp b/src/graph/gc/GC.cpp index 0d6b2899c52..644927cfb33 100644 --- a/src/graph/gc/GC.cpp +++ b/src/graph/gc/GC.cpp @@ -4,10 +4,12 @@ #include "graph/gc/GC.h" +#include "common/memory/MemoryTracker.h" #include "graph/service/GraphFlags.h" namespace nebula { namespace graph { + GC& GC::instance() { static GC gc; return gc; @@ -33,5 +35,6 @@ void GC::periodicTask() { queue_.try_dequeue(); } } + } // namespace graph } // namespace nebula