From 789881376aee2880d449ed071f900dc977cc4b60 Mon Sep 17 00:00:00 2001 From: Dmitry Uspenskiy <47734295+d-uspenskiy@users.noreply.github.com> Date: Wed, 23 Mar 2022 22:54:49 +0300 Subject: [PATCH] [#11815] YSQL: Move write operation buffering into separate file Summary: To simplify the code of `PgSession` class and simplify further implementation of write operation buffering subsystem improvements (issue #11628) code related to buffering is moved into separate class `PgOperationBuffer`. Current functionality of buffering subsystem is preserved (in general). Test Plan: Jenkins Reviewers: nli, pjain Reviewed By: pjain Differential Revision: https://phabricator.dev.yugabyte.com/D16083 --- src/yb/yql/pggate/CMakeLists.txt | 2 + src/yb/yql/pggate/pg_gate_fwd.h | 2 + src/yb/yql/pggate/pg_op.h | 2 - src/yb/yql/pggate/pg_operation_buffer.cc | 304 +++++++++++++++++++++++ src/yb/yql/pggate/pg_operation_buffer.h | 65 +++++ src/yb/yql/pggate/pg_perform_future.cc | 41 +++ src/yb/yql/pggate/pg_perform_future.h | 47 ++++ src/yb/yql/pggate/pg_session.cc | 265 +++----------------- src/yb/yql/pggate/pg_session.h | 82 +----- 9 files changed, 510 insertions(+), 300 deletions(-) create mode 100644 src/yb/yql/pggate/pg_operation_buffer.cc create mode 100644 src/yb/yql/pggate/pg_operation_buffer.h create mode 100644 src/yb/yql/pggate/pg_perform_future.cc create mode 100644 src/yb/yql/pggate/pg_perform_future.h diff --git a/src/yb/yql/pggate/CMakeLists.txt b/src/yb/yql/pggate/CMakeLists.txt index c1c1e9fcd3ae..52c6945afaad 100644 --- a/src/yb/yql/pggate/CMakeLists.txt +++ b/src/yb/yql/pggate/CMakeLists.txt @@ -28,6 +28,8 @@ set(PGGATE_SRCS pggate_thread_local_vars.cc pg_client.cc pg_op.cc + pg_operation_buffer.cc + pg_perform_future.cc pg_sample.cc pg_session.cc pg_statement.cc diff --git a/src/yb/yql/pggate/pg_gate_fwd.h b/src/yb/yql/pggate/pg_gate_fwd.h index c9a2b9482463..ba60dc653150 100644 --- a/src/yb/yql/pggate/pg_gate_fwd.h +++ b/src/yb/yql/pggate/pg_gate_fwd.h @@ -40,7 +40,9 @@ class PgTableDesc; using PgTableDescPtr = scoped_refptr; class PgsqlOp; +class PgsqlWriteOp; using PgsqlOpPtr = std::shared_ptr; +using PgsqlWriteOpPtr = std::shared_ptr; using PgsqlOps = std::vector; YB_STRONGLY_TYPED_BOOL(Commit); diff --git a/src/yb/yql/pggate/pg_op.h b/src/yb/yql/pggate/pg_op.h index f920d95a6017..430792b91f81 100644 --- a/src/yb/yql/pggate/pg_op.h +++ b/src/yb/yql/pggate/pg_op.h @@ -178,8 +178,6 @@ class PgsqlWriteOp : public PgsqlOp { HybridTime write_time_; }; -using PgsqlWriteOpPtr = std::shared_ptr; - CHECKED_STATUS ReviewResponsePagingState(const PgTableDesc& table, PgsqlReadOp* op); } // namespace pggate diff --git a/src/yb/yql/pggate/pg_operation_buffer.cc b/src/yb/yql/pggate/pg_operation_buffer.cc new file mode 100644 index 000000000000..0a7afb97a8cd --- /dev/null +++ b/src/yb/yql/pggate/pg_operation_buffer.cc @@ -0,0 +1,304 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/yql/pggate/pg_operation_buffer.h" + +#include +#include +#include +#include +#include + +#include "yb/common/constants.h" +#include "yb/common/pgsql_protocol.pb.h" +#include "yb/common/ql_expr.h" +#include "yb/common/ql_value.h" +#include "yb/common/schema.h" + +#include "yb/docdb/doc_key.h" +#include "yb/docdb/primitive_value.h" +#include "yb/docdb/value_type.h" + +#include "yb/gutil/casts.h" +#include "yb/gutil/port.h" + +#include "yb/util/lw_function.h" +#include "yb/util/status.h" + +#include "yb/yql/pggate/pg_op.h" +#include "yb/yql/pggate/pg_tabledesc.h" +#include "yb/yql/pggate/pggate_flags.h" + +namespace yb { +namespace pggate { +namespace { + +docdb::PrimitiveValue NullValue(SortingType sorting) { + using SortingType = SortingType; + + return docdb::PrimitiveValue( + sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast + ? docdb::ValueType::kNullHigh + : docdb::ValueType::kNullLow); +} + +void InitKeyColumnPrimitiveValues( + const google::protobuf::RepeatedPtrField &column_values, + const Schema &schema, + size_t start_idx, + vector *components) { + size_t column_idx = start_idx; + for (const auto& column_value : column_values) { + const auto sorting_type = schema.column(column_idx).sorting_type(); + if (column_value.has_value()) { + const auto& value = column_value.value(); + components->push_back( + IsNull(value) + ? NullValue(sorting_type) + : docdb::PrimitiveValue::FromQLValuePB(value, sorting_type)); + } else { + // TODO(neil) The current setup only works for CQL as it assumes primary key value must not + // be dependent on any column values. This needs to be fixed as PostgreSQL expression might + // require a read from a table. + // + // Use regular executor for now. + QLExprExecutor executor; + QLExprResult result; + auto s = executor.EvalExpr(column_value, nullptr, result.Writer()); + + components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type)); + } + ++column_idx; + } +} + +// Represents row id (ybctid) from the DocDB's point of view. +class RowIdentifier { + public: + RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request) + : table_id_(&request.table_id()) { + if (request.has_ybctid_column_value()) { + ybctid_ = &request.ybctid_column_value().value().binary_value(); + } else { + vector hashed_components; + vector range_components; + InitKeyColumnPrimitiveValues(request.partition_column_values(), + schema, + 0 /* start_idx */, + &hashed_components); + InitKeyColumnPrimitiveValues(request.range_column_values(), + schema, + schema.num_hash_key_columns(), + &range_components); + if (hashed_components.empty()) { + ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer(); + } else { + ybctid_holder_ = docdb::DocKey(request.hash_code(), + std::move(hashed_components), + std::move(range_components)).Encode().ToStringBuffer(); + } + ybctid_ = nullptr; + } + } + + inline const string& ybctid() const { + return ybctid_ ? *ybctid_ : ybctid_holder_; + } + + inline const string& table_id() const { + return *table_id_; + } + + private: + const std::string* table_id_; + const std::string* ybctid_; + std::string ybctid_holder_; +}; + + +bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) { + return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid(); +} + +size_t hash_value(const RowIdentifier& key) { + size_t hash = 0; + boost::hash_combine(hash, key.table_id()); + boost::hash_combine(hash, key.ybctid()); + return hash; +} + +inline bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const std::string& table_id) { + return request.table_id() == table_id || + (request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id)); +} + +} // namespace + +void BufferableOperations::Add(PgsqlOpPtr op, const PgObjectId& relation) { + operations.push_back(std::move(op)); + relations.push_back(relation); +} + +void BufferableOperations::Swap(BufferableOperations* rhs) { + operations.swap(rhs->operations); + relations.swap(rhs->relations); +} + +void BufferableOperations::Clear() { + operations.clear(); + relations.clear(); +} + +void BufferableOperations::Reserve(size_t capacity) { + operations.reserve(capacity); + relations.reserve(capacity); +} + +bool BufferableOperations::empty() const { + return operations.empty(); +} + +size_t BufferableOperations::size() const { + return operations.size(); +} + +class PgOperationBuffer::Impl { + public: + explicit Impl(const Flusher& flusher) + : flusher_(flusher) { + } + + CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) { + // Check for buffered operation related to same row. + // If multiple operations are performed in context of single RPC second operation will not + // see the results of first operation on DocDB side. + // Multiple operations on same row must be performed in context of different RPC. + // Flush is required in this case. + RowIdentifier row_id(table.schema(), op->write_request()); + if (PREDICT_FALSE(!keys_.insert(row_id).second)) { + RETURN_NOT_OK(Flush()); + keys_.insert(row_id); + } + auto& target = (transactional ? txn_ops_ : ops_); + if (target.empty()) { + target.Reserve(FLAGS_ysql_session_max_batch_size); + } + target.Add(std::move(op), table.id()); + return keys_.size() >= FLAGS_ysql_session_max_batch_size ? Flush() : Status::OK(); + } + + CHECKED_STATUS Flush() { + return DoFlush(make_lw_function([this](BufferableOperations ops, bool txn) { + return VERIFY_RESULT(flusher_(std::move(ops), txn)).Get(); + })); + } + + Result FlushTake( + const PgTableDesc& table, const PgsqlOp& op, bool transactional) { + BufferableOperations result; + if (IsFullFlushRequired(table.schema(), op)) { + RETURN_NOT_OK(Flush()); + } else { + RETURN_NOT_OK(DoFlush(make_lw_function( + [this, transactional, &result](BufferableOperations ops, bool txn) -> Status { + if (txn == transactional) { + ops.Swap(&result); + return Status::OK(); + } + return VERIFY_RESULT(flusher_(std::move(ops), txn)).Get(); + }))); + } + return result; + } + + size_t Size() const { + return keys_.size(); + } + + void Clear() { + VLOG_IF(1, !keys_.empty()) << "Dropping " << keys_.size() << " pending operations"; + ops_.Clear(); + txn_ops_.Clear(); + keys_.clear(); + } + + private: + using SyncFlusher = LWFunction; + + CHECKED_STATUS DoFlush(const SyncFlusher& flusher) { + BufferableOperations ops; + BufferableOperations txn_ops; + ops_.Swap(&ops); + txn_ops_.Swap(&txn_ops); + keys_.clear(); + + if (!ops.empty()) { + RETURN_NOT_OK(flusher(std::move(ops), false /* transactional */)); + } + if (!txn_ops.empty()) { + RETURN_NOT_OK(flusher(std::move(txn_ops), true /* transactional */)); + } + return Status::OK(); + } + + bool IsFullFlushRequired(const Schema& schema, const PgsqlOp& op) const { + return op.is_read() + ? IsSameTableUsedByBufferedOperations(down_cast(op).read_request()) + : keys_.find(RowIdentifier( + schema, down_cast(op).write_request())) != keys_.end(); + } + + bool IsSameTableUsedByBufferedOperations(const PgsqlReadRequestPB& request) const { + for (const auto& k : keys_) { + if (IsTableUsedByRequest(request, k.table_id())) { + return true; + } + } + return false; + } + + const Flusher flusher_; + BufferableOperations ops_; + BufferableOperations txn_ops_; + std::unordered_set> keys_; +}; + +PgOperationBuffer::PgOperationBuffer(const Flusher& flusher) + : impl_(new Impl(flusher)) { +} + +PgOperationBuffer::~PgOperationBuffer() = default; + +Status PgOperationBuffer::Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) { + return impl_->Add(table, std::move(op), transactional); +} + +CHECKED_STATUS PgOperationBuffer::Flush() { + return impl_->Flush(); +} + +Result PgOperationBuffer::FlushTake( + const PgTableDesc& table, const PgsqlOp& op, bool transactional) { + return impl_->FlushTake(table, op, transactional); +} + +size_t PgOperationBuffer::Size() const { + return impl_->Size(); +} + +void PgOperationBuffer::Clear() { + impl_->Clear(); +} + +} // namespace pggate +} // namespace yb diff --git a/src/yb/yql/pggate/pg_operation_buffer.h b/src/yb/yql/pggate/pg_operation_buffer.h new file mode 100644 index 000000000000..2cf43cb2e9f9 --- /dev/null +++ b/src/yb/yql/pggate/pg_operation_buffer.h @@ -0,0 +1,65 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_ +#define YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_ + +#include +#include + +#include "yb/common/common_fwd.h" +#include "yb/common/pg_types.h" + +#include "yb/util/result.h" +#include "yb/util/status_fwd.h" + +#include "yb/yql/pggate/pg_gate_fwd.h" +#include "yb/yql/pggate/pg_perform_future.h" + +namespace yb { +namespace pggate { + +struct BufferableOperations { + PgsqlOps operations; + PgObjectIds relations; + + void Add(PgsqlOpPtr op, const PgObjectId& relation); + void Swap(BufferableOperations* rhs); + void Clear(); + void Reserve(size_t capacity); + bool empty() const; + size_t size() const; +}; + +class PgOperationBuffer { + public: + using Flusher = std::function(BufferableOperations, bool)>; + + explicit PgOperationBuffer(const Flusher& flusher); + ~PgOperationBuffer(); + CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional); + CHECKED_STATUS Flush(); + Result FlushTake( + const PgTableDesc& table, const PgsqlOp& op, bool transactional); + size_t Size() const; + void Clear(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace pggate +} // namespace yb + +#endif // YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_ diff --git a/src/yb/yql/pggate/pg_perform_future.cc b/src/yb/yql/pggate/pg_perform_future.cc new file mode 100644 index 000000000000..07ad7e156211 --- /dev/null +++ b/src/yb/yql/pggate/pg_perform_future.cc @@ -0,0 +1,41 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/yql/pggate/pg_perform_future.h" + +#include + +#include "yb/yql/pggate/pg_session.h" + +namespace yb { +namespace pggate { + +PerformFuture::PerformFuture( + std::future future, PgSession* session, PgObjectIds relations) + : future_(std::move(future)), session_(session), relations_(std::move(relations)) { +} + +bool PerformFuture::Valid() const { + return session_ != nullptr; +} + +CHECKED_STATUS PerformFuture::Get() { + auto result = future_.get(); + auto session = session_; + session_ = nullptr; + session->TrySetCatalogReadPoint(result.catalog_read_time); + return session->PatchStatus(result.status, relations_); +} + +} // namespace pggate +} // namespace yb diff --git a/src/yb/yql/pggate/pg_perform_future.h b/src/yb/yql/pggate/pg_perform_future.h new file mode 100644 index 000000000000..1f621251b1bb --- /dev/null +++ b/src/yb/yql/pggate/pg_perform_future.h @@ -0,0 +1,47 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_YQL_PGGATE_PG_PERFORM_FUTURE_H_ +#define YB_YQL_PGGATE_PG_PERFORM_FUTURE_H_ + +#include + +#include "yb/common/common_fwd.h" + +#include "yb/util/status_fwd.h" + +#include "yb/yql/pggate/pg_client.h" + +namespace yb { +namespace pggate { + +class PgSession; + +class PerformFuture { + public: + PerformFuture() = default; + PerformFuture(std::future future, PgSession* session, PgObjectIds relations); + + bool Valid() const; + CHECKED_STATUS Get(); + + private: + std::future future_; + PgSession* session_ = nullptr; + PgObjectIds relations_; +}; + +} // namespace pggate +} // namespace yb + +#endif // YB_YQL_PGGATE_PG_PERFORM_FUTURE_H_ diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index 052cb2b73494..f98bb15724df 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -15,14 +15,14 @@ #include "yb/yql/pggate/pg_session.h" +#include +#include +#include #include -#include - #include "yb/client/batcher.h" #include "yb/client/error.h" #include "yb/client/schema.h" -#include "yb/client/session.h" #include "yb/client/table.h" #include "yb/client/tablet_server.h" #include "yb/client/transaction.h" @@ -38,10 +38,6 @@ #include "yb/common/schema.h" #include "yb/common/transaction_error.h" -#include "yb/docdb/doc_key.h" -#include "yb/docdb/primitive_value.h" -#include "yb/docdb/value_type.h" - #include "yb/gutil/casts.h" #include "yb/tserver/pg_client.pb.h" @@ -78,7 +74,6 @@ using std::unique_ptr; using std::shared_ptr; using std::string; -using client::YBSession; using client::YBMetaDataCache; using client::YBSchema; using client::YBOperation; @@ -92,62 +87,6 @@ using yb::tserver::TServerSharedObject; namespace { -docdb::PrimitiveValue NullValue(SortingType sorting) { - using SortingType = SortingType; - - return docdb::PrimitiveValue( - sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast - ? docdb::ValueType::kNullHigh - : docdb::ValueType::kNullLow); -} - -void InitKeyColumnPrimitiveValues( - const google::protobuf::RepeatedPtrField &column_values, - const Schema &schema, - size_t start_idx, - vector *components) { - size_t column_idx = start_idx; - for (const auto& column_value : column_values) { - const auto sorting_type = schema.column(column_idx).sorting_type(); - if (column_value.has_value()) { - const auto& value = column_value.value(); - components->push_back( - IsNull(value) - ? NullValue(sorting_type) - : docdb::PrimitiveValue::FromQLValuePB(value, sorting_type)); - } else { - // TODO(neil) The current setup only works for CQL as it assumes primary key value must not - // be dependent on any column values. This needs to be fixed as PostgreSQL expression might - // require a read from a table. - // - // Use regular executor for now. - QLExprExecutor executor; - QLExprResult result; - auto s = executor.EvalExpr(column_value, nullptr, result.Writer()); - - components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type)); - } - ++column_idx; - } -} - -bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const string& table_id) { - return request.table_id() == table_id || - (request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id)); -} - -bool IsTableUsedByRequest(const PgsqlWriteRequestPB& request, const string& table_id) { - return request.table_id() == table_id; -} - -bool IsTableUsedByOperation(const PgsqlOp& op, const string& table_id) { - if (op.is_read()) { - return IsTableUsedByRequest(down_cast(op).read_request(), table_id); - } else { - return IsTableUsedByRequest(down_cast(op).write_request(), table_id); - } -} - struct PgForeignKeyReferenceLightweight { PgOid table_id; Slice ybctid; @@ -220,24 +159,11 @@ Result GetRequiredSessionType( : SessionType::kRegular; } -} // namespace - - -PerformFuture::PerformFuture( - std::future future, PgSession* session, PgObjectIds* relations) - : future_(std::move(future)), session_(session), relations_(std::move(*relations)) {} - -bool PerformFuture::Valid() const { - return session_ != nullptr; +bool Empty(const PgOperationBuffer& buffer) { + return !buffer.Size(); } -CHECKED_STATUS PerformFuture::Get() { - auto result = future_.get(); - auto session = session_; - session_ = nullptr; - session->TrySetCatalogReadPoint(result.catalog_read_time); - return session->PatchStatus(result.status, relations_); -} +} // namespace //-------------------------------------------------------------------------------------------------- // Class PgSession::RunHelper @@ -246,44 +172,28 @@ CHECKED_STATUS PerformFuture::Get() { class PgSession::RunHelper { public: RunHelper(PgSession* pg_session, SessionType session_type) - : pg_session_(*pg_session), - session_type_(session_type), - buffer_(IsTransactional(session_type) ? pg_session_.buffered_txn_ops_ - : pg_session_.buffered_ops_) { + : pg_session_(*pg_session), session_type_(session_type) { } CHECKED_STATUS Apply(const PgTableDesc& table, const PgsqlOpPtr& op, uint64_t* read_time, bool force_non_bufferable) { - auto& buffered_keys = pg_session_.buffered_keys_; + auto& buffer = pg_session_.buffer_; // Try buffering this operation if it is a write operation, buffering is enabled and no // operations have been already applied to current session (yb session does not exist). if (operations_.empty() && pg_session_.buffering_enabled_ && !force_non_bufferable && op->is_write()) { - const auto& wop = down_cast(*op).write_request(); - // Check for buffered operation related to same row. - // If multiple operations are performed in context of single RPC second operation will not - // see the results of first operation on DocDB side. - // Multiple operations on same row must be performed in context of different RPC. - // Flush is required in this case. - RowIdentifier row_id(table.schema(), wop); - if (PREDICT_FALSE(!buffered_keys.insert(row_id).second)) { - RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); - buffered_keys.insert(row_id); - } - if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { - LOG(INFO) << "Buffering operation: " << wop.ShortDebugString(); - } - buffer_.Add(op, table.id()); - // Flush buffers in case limit of operations in single RPC exceeded. - return PREDICT_TRUE(buffered_keys.size() < FLAGS_ysql_session_max_batch_size) - ? Status::OK() - : pg_session_.FlushBufferedOperations(); + if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { + LOG(INFO) << "Buffering operation: " << op->ToString(); + } + return buffer.Add(table, + PgsqlWriteOpPtr(std::move(op), down_cast(op.get())), + IsTransactional()); } bool read_only = op->is_read(); // Flush all buffered operations (if any) before performing non-bufferable operation - if (!buffered_keys.empty()) { + if (!Empty(buffer)) { SCHECK(operations_.empty(), IllegalState, "Buffered operations must be flushed before applying first non-bufferable operation"); @@ -292,23 +202,10 @@ class PgSession::RunHelper { // Buffered operations must be flushed independently in this case. // Also operations for catalog session can be combined with buffered operations // as catalog session is used for read-only operations. - bool full_flush_required = (IsTransactional() && read_time && *read_time) || IsCatalog(); - // Check for buffered operation that affected same table as current operation. - for (auto i = buffered_keys.begin(); !full_flush_required && i != buffered_keys.end(); ++i) { - full_flush_required = IsTableUsedByOperation(*op, i->table_id()); - } - if (full_flush_required) { - RETURN_NOT_OK(pg_session_.FlushBufferedOperations()); + if ((IsTransactional() && read_time && *read_time) || IsCatalog()) { + RETURN_NOT_OK(buffer.Flush()); } else { - RETURN_NOT_OK(pg_session_.FlushBufferedOperationsImpl(make_lw_function( - [this](BufferableOperations ops, IsTransactionalSession transactional) -> Status { - if (transactional == IsTransactional()) { - // Save buffered operations for further applying before non-buffered operation. - operations_.Swap(&ops); - return Status::OK(); - } - return pg_session_.FlushOperations(std::move(ops), transactional); - }))); + operations_ = VERIFY_RESULT(buffer.FlushTake(table, *op, IsTransactional())); read_only = read_only && operations_.empty(); } } @@ -344,30 +241,24 @@ class PgSession::RunHelper { return PerformFuture(); } - auto promise = std::make_shared>(); - - pg_session_.Perform(&operations_.operations, IsCatalog(), [promise](PerformResult result) { - promise->set_value(result); - }); - return PerformFuture(promise->get_future(), &pg_session_, &operations_.relations); + return pg_session_.Perform(std::move(operations_), IsCatalog()); } private: - static bool IsTransactional(SessionType type) { + inline static bool IsTransactional(SessionType type) { return type == SessionType::kTransactional; } - bool IsTransactional() const { + inline bool IsTransactional() const { return IsTransactional(session_type_); } - bool IsCatalog() const { + inline bool IsCatalog() const { return session_type_ == SessionType::kCatalog; } PgSession& pg_session_; const SessionType session_type_; - BufferableOperations& buffer_; BufferableOperations operations_; }; @@ -388,55 +279,6 @@ size_t hash_value(const PgForeignKeyReference& key) { key.table_id, key.ybctid.c_str(), key.ybctid.c_str() + key.ybctid.length()); } -//-------------------------------------------------------------------------------------------------- -// Class RowIdentifier -//-------------------------------------------------------------------------------------------------- - -RowIdentifier::RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request) - : table_id_(&request.table_id()) { - if (request.has_ybctid_column_value()) { - ybctid_ = &request.ybctid_column_value().value().binary_value(); - } else { - vector hashed_components; - vector range_components; - InitKeyColumnPrimitiveValues(request.partition_column_values(), - schema, - 0 /* start_idx */, - &hashed_components); - InitKeyColumnPrimitiveValues(request.range_column_values(), - schema, - schema.num_hash_key_columns(), - &range_components); - if (hashed_components.empty()) { - ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer(); - } else { - ybctid_holder_ = docdb::DocKey(request.hash_code(), - std::move(hashed_components), - std::move(range_components)).Encode().ToStringBuffer(); - } - ybctid_ = nullptr; - } -} - -const string& RowIdentifier::ybctid() const { - return ybctid_ ? *ybctid_ : ybctid_holder_; -} - -const string& RowIdentifier::table_id() const { - return *table_id_; -} - -bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) { - return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid(); -} - -size_t hash_value(const RowIdentifier& key) { - size_t hash = 0; - boost::hash_combine(hash, key.table_id()); - boost::hash_combine(hash, key.ybctid()); - return hash; -} - //-------------------------------------------------------------------------------------------------- // Class PgSession //-------------------------------------------------------------------------------------------------- @@ -451,12 +293,13 @@ PgSession::PgSession( : pg_client_(*pg_client), pg_txn_manager_(std::move(pg_txn_manager)), clock_(std::move(clock)), + buffer_(std::bind( + &PgSession::FlushOperations, this, std::placeholders::_1, std::placeholders::_2)), tserver_shared_object_(tserver_shared_object), pg_callbacks_(pg_callbacks) { } -PgSession::~PgSession() { -} +PgSession::~PgSession() = default; //-------------------------------------------------------------------------------------------------- @@ -605,9 +448,9 @@ void PgSession::InvalidateAllTablesCache() { Status PgSession::StartOperationsBuffering() { SCHECK(!buffering_enabled_, IllegalState, "Buffering has been already started"); - if (PREDICT_FALSE(!buffered_keys_.empty())) { + if (PREDICT_FALSE(!Empty(buffer_))) { LOG(DFATAL) << "Buffering hasn't been started yet but " - << buffered_keys_.size() + << buffer_.Size() << " buffered operations found"; } buffering_enabled_ = true; @@ -626,49 +469,22 @@ void PgSession::ResetOperationsBuffering() { } Status PgSession::FlushBufferedOperations() { - return FlushBufferedOperationsImpl(make_lw_function( - [this](BufferableOperations ops, IsTransactionalSession txn) { - return this->FlushOperations(std::move(ops), txn); - })); + return buffer_.Flush(); } void PgSession::DropBufferedOperations() { - VLOG_IF(1, !buffered_keys_.empty()) - << "Dropping " << buffered_keys_.size() << " pending operations"; - buffered_keys_.clear(); - buffered_ops_.Clear(); - buffered_txn_ops_.Clear(); + buffer_.Clear(); } PgIsolationLevel PgSession::GetIsolationLevel() { return pg_txn_manager_->GetPgIsolationLevel(); } -Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) { - auto ops = std::move(buffered_ops_); - auto txn_ops = std::move(buffered_txn_ops_); - buffered_keys_.clear(); - buffered_ops_.Clear(); - buffered_txn_ops_.Clear(); - if (!ops.empty()) { - RETURN_NOT_OK(flusher(std::move(ops), IsTransactionalSession::kFalse)); - } - if (!txn_ops.empty()) { - SCHECK(!YBCIsInitDbModeEnvVarSet(), - IllegalState, - "No transactional operations are expected in the initdb mode"); - RETURN_NOT_OK(flusher(std::move(txn_ops), IsTransactionalSession::kTrue)); - } - return Status::OK(); -} - Result PgSession::IsInitDbDone() { return pg_client_.IsInitDbDone(); } -Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSession transactional) { - DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size); - +Result PgSession::FlushOperations(BufferableOperations ops, bool transactional) { if (PREDICT_FALSE(yb_debug_log_docdb_requests)) { LOG(INFO) << "Flushing buffered operations, using " << (transactional ? "transactional" : "non-transactional") @@ -685,17 +501,11 @@ Status PgSession::FlushOperations(BufferableOperations ops, IsTransactionalSessi in_txn_limit_ = clock_->Now(); } - std::promise promise; - Perform( - &ops.operations, /* use_catalog_session */ false, [&promise](const PerformResult& result) { - promise.set_value(result); - }); - PerformFuture future(promise.get_future(), this, &ops.relations); - return future.Get(); + return Perform(std::move(ops), /* use_catalog_session */ false); } -void PgSession::Perform( - PgsqlOps* operations, bool use_catalog_session, const PerformCallback& callback) { +Result PgSession::Perform(BufferableOperations ops, bool use_catalog_session) { + DCHECK(!ops.empty()); tserver::PgPerformOptionsPB options; if (use_catalog_session) { @@ -716,7 +526,12 @@ void PgSession::Perform( } options.set_force_global_transaction(yb_force_global_transaction); - pg_client_.PerformAsync(&options, operations, callback); + auto promise = std::make_shared>(); + + pg_client_.PerformAsync(&options, &ops.operations, [promise](PerformResult result) { + promise->set_value(result); + }); + return PerformFuture(promise->get_future(), this, std::move(ops.relations)); } Result PgSession::GetSharedCatalogVersion() { @@ -862,7 +677,7 @@ Status PgSession::SetActiveSubTransaction(SubTransactionId id) { txn_priority_requirement = kHighestPriority; } RETURN_NOT_OK(pg_txn_manager_->CalculateIsolation( - IsReadOnlyOperation::kFalse, txn_priority_requirement)); + false /* read_only_op */, txn_priority_requirement)); options_ptr = &options; pg_txn_manager_->SetupPerformOptions(&options); } diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index a655d5ce1c62..894f1be74bc7 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -14,14 +14,15 @@ #ifndef YB_YQL_PGGATE_PG_SESSION_H_ #define YB_YQL_PGGATE_PG_SESSION_H_ -#include +#include +#include +#include +#include #include #include #include "yb/client/client_fwd.h" -#include "yb/client/session.h" -#include "yb/client/transaction.h" #include "yb/common/pg_types.h" #include "yb/common/transaction.h" @@ -39,6 +40,8 @@ #include "yb/yql/pggate/pg_client.h" #include "yb/yql/pggate/pg_gate_fwd.h" #include "yb/yql/pggate/pg_env.h" +#include "yb/yql/pggate/pg_operation_buffer.h" +#include "yb/yql/pggate/pg_perform_future.h" #include "yb/yql/pggate/pg_tabledesc.h" #include "yb/yql/pggate/pg_txn_manager.h" @@ -51,71 +54,12 @@ YB_STRONGLY_TYPED_BOOL(InvalidateOnPgClient); class PgTxnManager; class PgSession; -struct BufferableOperations { - PgsqlOps operations; - PgObjectIds relations; - - void Add(PgsqlOpPtr op, const PgObjectId& relation) { - operations.push_back(std::move(op)); - relations.push_back(relation); - } - - void Clear() { - operations.clear(); - relations.clear(); - } - - void Swap(BufferableOperations* rhs) { - operations.swap(rhs->operations); - relations.swap(rhs->relations); - } - - bool empty() const { - return operations.empty(); - } - - size_t size() const { - return operations.size(); - } -}; - struct PgForeignKeyReference { PgForeignKeyReference(PgOid table_id, std::string ybctid); PgOid table_id; std::string ybctid; }; -// Represents row id (ybctid) from the DocDB's point of view. -class RowIdentifier { - public: - RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request); - inline const std::string& ybctid() const; - inline const std::string& table_id() const; - - private: - const std::string* table_id_; - const std::string* ybctid_; - std::string ybctid_holder_; -}; - -YB_STRONGLY_TYPED_BOOL(IsTransactionalSession); -YB_STRONGLY_TYPED_BOOL(IsReadOnlyOperation); -YB_STRONGLY_TYPED_BOOL(IsCatalogOperation); - -class PerformFuture { - public: - PerformFuture() = default; - PerformFuture(std::future future, PgSession* session, PgObjectIds* relations); - - bool Valid() const; - - CHECKED_STATUS Get(); - private: - std::future future_; - PgSession* session_ = nullptr; - PgObjectIds relations_; -}; - // This class is not thread-safe as it is mostly used by a single-threaded PostgreSQL backend // process. class PgSession : public RefCountedThreadSafe { @@ -321,17 +265,11 @@ class PgSession : public RefCountedThreadSafe { CHECKED_STATUS RollbackSubTransaction(SubTransactionId id); private: - using Flusher = LWFunction; - - CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher); - CHECKED_STATUS FlushOperations(BufferableOperations ops, IsTransactionalSession transactional); + Result FlushOperations(BufferableOperations ops, bool transactional); class RunHelper; - // Flush buffered write operations from the given buffer. - Status FlushBufferedWriteOperations(BufferableOperations* write_ops, bool transactional); - - void Perform(PgsqlOps* operations, bool use_catalog_session, const PerformCallback& callback); + Result Perform(BufferableOperations ops, bool use_catalog_session); void UpdateInTxnLimit(uint64_t* read_time); @@ -359,9 +297,7 @@ class PgSession : public RefCountedThreadSafe { // Should write operations be buffered? bool buffering_enabled_ = false; - BufferableOperations buffered_ops_; - BufferableOperations buffered_txn_ops_; - std::unordered_set> buffered_keys_; + PgOperationBuffer buffer_; HybridTime in_txn_limit_;