Skip to content

Commit

Permalink
[#11815] YSQL: Move write operation buffering into separate file
Browse files Browse the repository at this point in the history
Summary: To simplify the code of `PgSession` class and simplify further implementation of write operation buffering subsystem improvements (issue #11628) code related to buffering is moved into separate class `PgOperationBuffer`. Current functionality of buffering subsystem is preserved (in general).

Test Plan: Jenkins

Reviewers: nli, pjain

Reviewed By: pjain

Differential Revision: https://phabricator.dev.yugabyte.com/D16083
  • Loading branch information
d-uspenskiy committed Mar 25, 2022
1 parent 0295e25 commit 7898813
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 300 deletions.
2 changes: 2 additions & 0 deletions src/yb/yql/pggate/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ set(PGGATE_SRCS
pggate_thread_local_vars.cc
pg_client.cc
pg_op.cc
pg_operation_buffer.cc
pg_perform_future.cc
pg_sample.cc
pg_session.cc
pg_statement.cc
Expand Down
2 changes: 2 additions & 0 deletions src/yb/yql/pggate/pg_gate_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class PgTableDesc;
using PgTableDescPtr = scoped_refptr<PgTableDesc>;

class PgsqlOp;
class PgsqlWriteOp;
using PgsqlOpPtr = std::shared_ptr<PgsqlOp>;
using PgsqlWriteOpPtr = std::shared_ptr<PgsqlWriteOp>;
using PgsqlOps = std::vector<PgsqlOpPtr>;

YB_STRONGLY_TYPED_BOOL(Commit);
Expand Down
2 changes: 0 additions & 2 deletions src/yb/yql/pggate/pg_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ class PgsqlWriteOp : public PgsqlOp {
HybridTime write_time_;
};

using PgsqlWriteOpPtr = std::shared_ptr<PgsqlWriteOp>;

CHECKED_STATUS ReviewResponsePagingState(const PgTableDesc& table, PgsqlReadOp* op);

} // namespace pggate
Expand Down
304 changes: 304 additions & 0 deletions src/yb/yql/pggate/pg_operation_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/yql/pggate/pg_operation_buffer.h"

#include <string>
#include <ostream>
#include <unordered_set>
#include <utility>
#include <vector>

#include "yb/common/constants.h"
#include "yb/common/pgsql_protocol.pb.h"
#include "yb/common/ql_expr.h"
#include "yb/common/ql_value.h"
#include "yb/common/schema.h"

#include "yb/docdb/doc_key.h"
#include "yb/docdb/primitive_value.h"
#include "yb/docdb/value_type.h"

#include "yb/gutil/casts.h"
#include "yb/gutil/port.h"

#include "yb/util/lw_function.h"
#include "yb/util/status.h"

#include "yb/yql/pggate/pg_op.h"
#include "yb/yql/pggate/pg_tabledesc.h"
#include "yb/yql/pggate/pggate_flags.h"

namespace yb {
namespace pggate {
namespace {

docdb::PrimitiveValue NullValue(SortingType sorting) {
using SortingType = SortingType;

return docdb::PrimitiveValue(
sorting == SortingType::kAscendingNullsLast || sorting == SortingType::kDescendingNullsLast
? docdb::ValueType::kNullHigh
: docdb::ValueType::kNullLow);
}

void InitKeyColumnPrimitiveValues(
const google::protobuf::RepeatedPtrField<PgsqlExpressionPB> &column_values,
const Schema &schema,
size_t start_idx,
vector<docdb::PrimitiveValue> *components) {
size_t column_idx = start_idx;
for (const auto& column_value : column_values) {
const auto sorting_type = schema.column(column_idx).sorting_type();
if (column_value.has_value()) {
const auto& value = column_value.value();
components->push_back(
IsNull(value)
? NullValue(sorting_type)
: docdb::PrimitiveValue::FromQLValuePB(value, sorting_type));
} else {
// TODO(neil) The current setup only works for CQL as it assumes primary key value must not
// be dependent on any column values. This needs to be fixed as PostgreSQL expression might
// require a read from a table.
//
// Use regular executor for now.
QLExprExecutor executor;
QLExprResult result;
auto s = executor.EvalExpr(column_value, nullptr, result.Writer());

components->push_back(docdb::PrimitiveValue::FromQLValuePB(result.Value(), sorting_type));
}
++column_idx;
}
}

// Represents row id (ybctid) from the DocDB's point of view.
class RowIdentifier {
public:
RowIdentifier(const Schema& schema, const PgsqlWriteRequestPB& request)
: table_id_(&request.table_id()) {
if (request.has_ybctid_column_value()) {
ybctid_ = &request.ybctid_column_value().value().binary_value();
} else {
vector<docdb::PrimitiveValue> hashed_components;
vector<docdb::PrimitiveValue> range_components;
InitKeyColumnPrimitiveValues(request.partition_column_values(),
schema,
0 /* start_idx */,
&hashed_components);
InitKeyColumnPrimitiveValues(request.range_column_values(),
schema,
schema.num_hash_key_columns(),
&range_components);
if (hashed_components.empty()) {
ybctid_holder_ = docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer();
} else {
ybctid_holder_ = docdb::DocKey(request.hash_code(),
std::move(hashed_components),
std::move(range_components)).Encode().ToStringBuffer();
}
ybctid_ = nullptr;
}
}

inline const string& ybctid() const {
return ybctid_ ? *ybctid_ : ybctid_holder_;
}

inline const string& table_id() const {
return *table_id_;
}

private:
const std::string* table_id_;
const std::string* ybctid_;
std::string ybctid_holder_;
};


bool operator==(const RowIdentifier& k1, const RowIdentifier& k2) {
return k1.table_id() == k2.table_id() && k1.ybctid() == k2.ybctid();
}

size_t hash_value(const RowIdentifier& key) {
size_t hash = 0;
boost::hash_combine(hash, key.table_id());
boost::hash_combine(hash, key.ybctid());
return hash;
}

inline bool IsTableUsedByRequest(const PgsqlReadRequestPB& request, const std::string& table_id) {
return request.table_id() == table_id ||
(request.has_index_request() && IsTableUsedByRequest(request.index_request(), table_id));
}

} // namespace

void BufferableOperations::Add(PgsqlOpPtr op, const PgObjectId& relation) {
operations.push_back(std::move(op));
relations.push_back(relation);
}

void BufferableOperations::Swap(BufferableOperations* rhs) {
operations.swap(rhs->operations);
relations.swap(rhs->relations);
}

void BufferableOperations::Clear() {
operations.clear();
relations.clear();
}

void BufferableOperations::Reserve(size_t capacity) {
operations.reserve(capacity);
relations.reserve(capacity);
}

bool BufferableOperations::empty() const {
return operations.empty();
}

size_t BufferableOperations::size() const {
return operations.size();
}

class PgOperationBuffer::Impl {
public:
explicit Impl(const Flusher& flusher)
: flusher_(flusher) {
}

CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) {
// Check for buffered operation related to same row.
// If multiple operations are performed in context of single RPC second operation will not
// see the results of first operation on DocDB side.
// Multiple operations on same row must be performed in context of different RPC.
// Flush is required in this case.
RowIdentifier row_id(table.schema(), op->write_request());
if (PREDICT_FALSE(!keys_.insert(row_id).second)) {
RETURN_NOT_OK(Flush());
keys_.insert(row_id);
}
auto& target = (transactional ? txn_ops_ : ops_);
if (target.empty()) {
target.Reserve(FLAGS_ysql_session_max_batch_size);
}
target.Add(std::move(op), table.id());
return keys_.size() >= FLAGS_ysql_session_max_batch_size ? Flush() : Status::OK();
}

CHECKED_STATUS Flush() {
return DoFlush(make_lw_function([this](BufferableOperations ops, bool txn) {
return VERIFY_RESULT(flusher_(std::move(ops), txn)).Get();
}));
}

Result<BufferableOperations> FlushTake(
const PgTableDesc& table, const PgsqlOp& op, bool transactional) {
BufferableOperations result;
if (IsFullFlushRequired(table.schema(), op)) {
RETURN_NOT_OK(Flush());
} else {
RETURN_NOT_OK(DoFlush(make_lw_function(
[this, transactional, &result](BufferableOperations ops, bool txn) -> Status {
if (txn == transactional) {
ops.Swap(&result);
return Status::OK();
}
return VERIFY_RESULT(flusher_(std::move(ops), txn)).Get();
})));
}
return result;
}

size_t Size() const {
return keys_.size();
}

void Clear() {
VLOG_IF(1, !keys_.empty()) << "Dropping " << keys_.size() << " pending operations";
ops_.Clear();
txn_ops_.Clear();
keys_.clear();
}

private:
using SyncFlusher = LWFunction<Status(BufferableOperations, bool)>;

CHECKED_STATUS DoFlush(const SyncFlusher& flusher) {
BufferableOperations ops;
BufferableOperations txn_ops;
ops_.Swap(&ops);
txn_ops_.Swap(&txn_ops);
keys_.clear();

if (!ops.empty()) {
RETURN_NOT_OK(flusher(std::move(ops), false /* transactional */));
}
if (!txn_ops.empty()) {
RETURN_NOT_OK(flusher(std::move(txn_ops), true /* transactional */));
}
return Status::OK();
}

bool IsFullFlushRequired(const Schema& schema, const PgsqlOp& op) const {
return op.is_read()
? IsSameTableUsedByBufferedOperations(down_cast<const PgsqlReadOp&>(op).read_request())
: keys_.find(RowIdentifier(
schema, down_cast<const PgsqlWriteOp&>(op).write_request())) != keys_.end();
}

bool IsSameTableUsedByBufferedOperations(const PgsqlReadRequestPB& request) const {
for (const auto& k : keys_) {
if (IsTableUsedByRequest(request, k.table_id())) {
return true;
}
}
return false;
}

const Flusher flusher_;
BufferableOperations ops_;
BufferableOperations txn_ops_;
std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>> keys_;
};

PgOperationBuffer::PgOperationBuffer(const Flusher& flusher)
: impl_(new Impl(flusher)) {
}

PgOperationBuffer::~PgOperationBuffer() = default;

Status PgOperationBuffer::Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) {
return impl_->Add(table, std::move(op), transactional);
}

CHECKED_STATUS PgOperationBuffer::Flush() {
return impl_->Flush();
}

Result<BufferableOperations> PgOperationBuffer::FlushTake(
const PgTableDesc& table, const PgsqlOp& op, bool transactional) {
return impl_->FlushTake(table, op, transactional);
}

size_t PgOperationBuffer::Size() const {
return impl_->Size();
}

void PgOperationBuffer::Clear() {
impl_->Clear();
}

} // namespace pggate
} // namespace yb
65 changes: 65 additions & 0 deletions src/yb/yql/pggate/pg_operation_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_
#define YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_

#include <functional>
#include <memory>

#include "yb/common/common_fwd.h"
#include "yb/common/pg_types.h"

#include "yb/util/result.h"
#include "yb/util/status_fwd.h"

#include "yb/yql/pggate/pg_gate_fwd.h"
#include "yb/yql/pggate/pg_perform_future.h"

namespace yb {
namespace pggate {

struct BufferableOperations {
PgsqlOps operations;
PgObjectIds relations;

void Add(PgsqlOpPtr op, const PgObjectId& relation);
void Swap(BufferableOperations* rhs);
void Clear();
void Reserve(size_t capacity);
bool empty() const;
size_t size() const;
};

class PgOperationBuffer {
public:
using Flusher = std::function<Result<PerformFuture>(BufferableOperations, bool)>;

explicit PgOperationBuffer(const Flusher& flusher);
~PgOperationBuffer();
CHECKED_STATUS Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional);
CHECKED_STATUS Flush();
Result<BufferableOperations> FlushTake(
const PgTableDesc& table, const PgsqlOp& op, bool transactional);
size_t Size() const;
void Clear();

private:
class Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace pggate
} // namespace yb

#endif // YB_YQL_PGGATE_PG_OPERATION_BUFFER_H_
Loading

0 comments on commit 7898813

Please sign in to comment.