Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,29 @@ void ShadowRouterImpl::flushPendingCallbacks() {
pending_callbacks_.clear();
}

FilterStatus ShadowRouterImpl::runOrSave(std::function<FilterStatus()>&& cb,
const std::function<void()>& on_save) {
if (requestStarted()) {
return cb();
}

pending_callbacks_.push_back(std::move(cb));

if (on_save) {
on_save();
}

return FilterStatus::Continue;
}

FilterStatus ShadowRouterImpl::passthroughData(Buffer::Instance& data) {
if (requestStarted()) {
return ProtocolConverter::passthroughData(data);
}

auto copied = std::make_shared<Buffer::OwnedImpl>(data);
auto cb = [copied = std::move(copied), this]() mutable {
ProtocolConverter::passthroughData(*copied);
auto cb = [copied = std::move(copied), this]() mutable -> FilterStatus {
return ProtocolConverter::passthroughData(*copied);
};
pending_callbacks_.push_back(std::move(cb));

Expand All @@ -98,23 +113,16 @@ FilterStatus ShadowRouterImpl::structBegin(absl::string_view name) {
return ProtocolConverter::structBegin(name);
}

auto cb = [name_str = std::string(name), this]() {
ProtocolConverter::structBegin(absl::string_view(name_str));
auto cb = [name_str = std::string(name), this]() -> FilterStatus {
return ProtocolConverter::structBegin(absl::string_view(name_str));
};
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
}

FilterStatus ShadowRouterImpl::structEnd() {
if (requestStarted()) {
return ProtocolConverter::structEnd();
}

auto cb = [this]() { ProtocolConverter::structEnd(); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave([this]() -> FilterStatus { return ProtocolConverter::structEnd(); });
}

FilterStatus ShadowRouterImpl::fieldBegin(absl::string_view name, FieldType& field_type,
Expand All @@ -123,98 +131,55 @@ FilterStatus ShadowRouterImpl::fieldBegin(absl::string_view name, FieldType& fie
return ProtocolConverter::fieldBegin(name, field_type, field_id);
}

auto cb = [name_str = std::string(name), field_type, field_id, this]() mutable {
ProtocolConverter::fieldBegin(absl::string_view(name_str), field_type, field_id);
auto cb = [name_str = std::string(name), field_type, field_id, this]() mutable -> FilterStatus {
return ProtocolConverter::fieldBegin(absl::string_view(name_str), field_type, field_id);
};
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
}

FilterStatus ShadowRouterImpl::fieldEnd() {
if (requestStarted()) {
return ProtocolConverter::fieldEnd();
}

auto cb = [this]() { ProtocolConverter::fieldEnd(); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave([this]() -> FilterStatus { return ProtocolConverter::fieldEnd(); });
}

FilterStatus ShadowRouterImpl::boolValue(bool& value) {
if (requestStarted()) {
return ProtocolConverter::boolValue(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::boolValue(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::boolValue(value); });
}

FilterStatus ShadowRouterImpl::byteValue(uint8_t& value) {
if (requestStarted()) {
return ProtocolConverter::byteValue(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::byteValue(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::byteValue(value); });
}

FilterStatus ShadowRouterImpl::int16Value(int16_t& value) {
if (requestStarted()) {
return ProtocolConverter::int16Value(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::int16Value(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::int16Value(value); });
}

FilterStatus ShadowRouterImpl::int32Value(int32_t& value) {
if (requestStarted()) {
return ProtocolConverter::int32Value(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::int32Value(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::int32Value(value); });
}

FilterStatus ShadowRouterImpl::int64Value(int64_t& value) {
if (requestStarted()) {
return ProtocolConverter::int64Value(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::int64Value(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::int64Value(value); });
}

FilterStatus ShadowRouterImpl::doubleValue(double& value) {
if (requestStarted()) {
return ProtocolConverter::doubleValue(value);
}

auto cb = [value, this]() mutable { ProtocolConverter::doubleValue(value); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave(
[value, this]() mutable -> FilterStatus { return ProtocolConverter::doubleValue(value); });
}

FilterStatus ShadowRouterImpl::stringValue(absl::string_view value) {
if (requestStarted()) {
return ProtocolConverter::stringValue(value);
}

auto cb = [value_str = std::string(value), this]() {
ProtocolConverter::stringValue(absl::string_view(value_str));
auto cb = [value_str = std::string(value), this]() -> FilterStatus {
return ProtocolConverter::stringValue(absl::string_view(value_str));
};
pending_callbacks_.push_back(std::move(cb));

Expand All @@ -223,75 +188,37 @@ FilterStatus ShadowRouterImpl::stringValue(absl::string_view value) {

FilterStatus ShadowRouterImpl::mapBegin(FieldType& key_type, FieldType& value_type,
uint32_t& size) {
if (requestStarted()) {
return runOrSave([key_type, value_type, size, this]() mutable -> FilterStatus {
return ProtocolConverter::mapBegin(key_type, value_type, size);
}

auto cb = [key_type, value_type, size, this]() mutable {
ProtocolConverter::mapBegin(key_type, value_type, size);
};
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
});
}

FilterStatus ShadowRouterImpl::mapEnd() {
if (requestStarted()) {
return ProtocolConverter::mapEnd();
}

auto cb = [this]() { ProtocolConverter::mapEnd(); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave([this]() -> FilterStatus { return ProtocolConverter::mapEnd(); });
}

FilterStatus ShadowRouterImpl::listBegin(FieldType& elem_type, uint32_t& size) {
if (requestStarted()) {
return runOrSave([elem_type, size, this]() mutable -> FilterStatus {
return ProtocolConverter::listBegin(elem_type, size);
}

auto cb = [elem_type, size, this]() mutable { ProtocolConverter::listBegin(elem_type, size); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
});
}

FilterStatus ShadowRouterImpl::listEnd() {
if (requestStarted()) {
return ProtocolConverter::listEnd();
}

auto cb = [this]() { ProtocolConverter::listEnd(); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave([this]() -> FilterStatus { return ProtocolConverter::listEnd(); });
}

FilterStatus ShadowRouterImpl::setBegin(FieldType& elem_type, uint32_t& size) {
if (requestStarted()) {
return runOrSave([elem_type, size, this]() mutable -> FilterStatus {
return ProtocolConverter::setBegin(elem_type, size);
}

auto cb = [elem_type, size, this]() mutable { ProtocolConverter::setBegin(elem_type, size); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
});
}

FilterStatus ShadowRouterImpl::setEnd() {
if (requestStarted()) {
return ProtocolConverter::setEnd();
}

auto cb = [this]() { ProtocolConverter::setEnd(); };
pending_callbacks_.push_back(std::move(cb));

return FilterStatus::Continue;
return runOrSave([this]() -> FilterStatus { return ProtocolConverter::setEnd(); });
}

FilterStatus ShadowRouterImpl::messageEnd() {
auto cb = [this]() {
auto cb = [this]() -> FilterStatus {
ASSERT(upstream_request_->conn_data_ != nullptr);

ProtocolConverter::messageEnd();
Expand All @@ -304,16 +231,11 @@ FilterStatus ShadowRouterImpl::messageEnd() {
if (metadata_->messageType() == MessageType::Oneway) {
upstream_request_->releaseConnection(false);
}
};

if (requestStarted()) {
cb();
} else {
request_ready_ = true;
pending_callbacks_.push_back(std::move(cb));
}
return FilterStatus::Continue;
};

return FilterStatus::Continue;
return runOrSave(std::move(cb), [this]() -> void { request_ready_ = true; });
}

bool ShadowRouterImpl::requestInProgress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,14 @@ class ShadowRouterImpl : public ShadowRouterHandle,

private:
friend class ShadowWriterTest;
using ConverterCallback = std::function<FilterStatus()>;

void writeRequest();
bool requestInProgress();
bool requestStarted() const;
void flushPendingCallbacks();
FilterStatus runOrSave(std::function<FilterStatus()>&& cb,
const std::function<void()>& on_save = {});

ShadowWriterImpl& parent_;
const std::string cluster_name_;
Expand All @@ -222,7 +225,6 @@ class ShadowRouterImpl : public ShadowRouterHandle,
uint64_t response_size_{};
bool request_ready_ : 1;

using ConverterCallback = std::function<void()>;
std::list<ConverterCallback> pending_callbacks_;
bool removed_{};
bool deferred_deleting_{};
Expand Down