Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions source/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ envoy_cc_library(
external_deps = ["abseil_optional"],
deps = [
":thrift_lib",
"//include/envoy/buffer:buffer_interface",
"//source/common/common:macros",
],
)
Expand All @@ -128,14 +129,18 @@ envoy_cc_library(
],
external_deps = ["abseil_optional"],
deps = [
":conn_state_lib",
":decoder_events_lib",
":metadata_lib",
":thrift_lib",
":thrift_object_interface",
":transport_interface",
"//include/envoy/buffer:buffer_interface",
"//include/envoy/registry",
"//source/common/common:assert_lib",
"//source/common/config:utility_lib",
"//source/common/singleton:const_singleton",
"//source/extensions/filters/network/thrift_proxy/filters:filter_interface",
],
)

Expand Down Expand Up @@ -221,6 +226,14 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "conn_state_lib",
hdrs = ["conn_state.h"],
deps = [
"//include/envoy/tcp:conn_pool_interface",
],
)

envoy_cc_library(
name = "thrift_lib",
hdrs = ["thrift.h"],
Expand All @@ -230,6 +243,27 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "thrift_object_interface",
hdrs = ["thrift_object.h"],
deps = [
"//include/envoy/buffer:buffer_interface",
],
)

envoy_cc_library(
name = "thrift_object_lib",
srcs = ["thrift_object_impl.cc"],
hdrs = ["thrift_object_impl.h"],
deps = [
":decoder_lib",
":thrift_lib",
":thrift_object_interface",
":unframed_transport_lib",
"//source/extensions/filters/network/thrift_proxy/filters:filter_interface",
],
)

envoy_cc_library(
name = "auto_transport_lib",
srcs = [
Expand Down
4 changes: 0 additions & 4 deletions source/extensions/filters/network/thrift_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ void ConfigImpl::createFilterChain(ThriftFilters::FilterChainFactoryCallbacks& c
}
}

DecoderPtr ConfigImpl::createDecoder(DecoderCallbacks& callbacks) {
return std::make_unique<Decoder>(createTransport(), createProtocol(), callbacks);
}

TransportPtr ConfigImpl::createTransport() {
return NamedTransportConfigFactory::getFactory(transport_).createTransport();
}
Expand Down
6 changes: 2 additions & 4 deletions source/extensions/filters/network/thrift_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,11 @@ class ConfigImpl : public Config,
// Config
ThriftFilterStats& stats() override { return stats_; }
ThriftFilters::FilterChainFactory& filterFactory() override { return *this; }
DecoderPtr createDecoder(DecoderCallbacks& callbacks) override;
TransportPtr createTransport() override;
ProtocolPtr createProtocol() override;
Router::Config& routerConfig() override { return *this; }

private:
TransportPtr createTransport();
ProtocolPtr createProtocol();

Server::Configuration::FactoryContext& context_;
const std::string stats_prefix_;
ThriftFilterStats stats_;
Expand Down
43 changes: 27 additions & 16 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ namespace NetworkFilters {
namespace ThriftProxy {

ConnectionManager::ConnectionManager(Config& config)
: config_(config), stats_(config_.stats()), decoder_(config_.createDecoder(*this)) {}
: config_(config), stats_(config_.stats()), transport_(config.createTransport()),
protocol_(config.createProtocol()),
decoder_(std::make_unique<Decoder>(*transport_, *protocol_, *this)) {}

ConnectionManager::~ConnectionManager() {}

Expand Down Expand Up @@ -67,22 +69,14 @@ void ConnectionManager::dispatch() {
}

void ConnectionManager::sendLocalReply(MessageMetadata& metadata, const DirectResponse& response) {
// Use the factory to get the concrete protocol from the decoder protocol (as opposed to
// potentially pre-detection auto protocol).
ProtocolType proto_type = decoder_->protocolType();
ProtocolPtr proto = NamedProtocolConfigFactory::getFactory(proto_type).createProtocol();
Buffer::OwnedImpl buffer;

response.encode(metadata, *proto, buffer);

// Same logic as protocol above.
TransportPtr transport =
NamedTransportConfigFactory::getFactory(decoder_->transportType()).createTransport();
response.encode(metadata, *protocol_, buffer);

Buffer::OwnedImpl response_buffer;

metadata.setProtocol(proto_type);
transport->encodeFrame(response_buffer, metadata, buffer);
metadata.setProtocol(protocol_->type());
transport_->encodeFrame(response_buffer, metadata, buffer);

read_callbacks_->connection().write(response_buffer, false);
}
Expand Down Expand Up @@ -230,12 +224,30 @@ FilterStatus ConnectionManager::ActiveRpc::transportEnd() {
break;
}

return decoder_filter_->transportEnd();
FilterStatus status = event_handler_->transportEnd();

if (metadata_->isProtocolUpgradeMessage()) {
ENVOY_CONN_LOG(error, "thrift: sending protocol upgrade response",
parent_.read_callbacks_->connection());
sendLocalReply(*parent_.protocol_->upgradeResponse(*upgrade_handler_));
}

return status;
}

FilterStatus ConnectionManager::ActiveRpc::messageBegin(MessageMetadataSharedPtr metadata) {
metadata_ = metadata;

if (metadata_->isProtocolUpgradeMessage()) {
ASSERT(parent_.protocol_->supportsUpgrade());

ENVOY_CONN_LOG(error, "thrift: decoding protocol upgrade request",
parent_.read_callbacks_->connection());
upgrade_handler_ = parent_.protocol_->upgradeRequestDecoder();
ASSERT(upgrade_handler_ != nullptr);
event_handler_ = upgrade_handler_.get();
}

return event_handler_->messageBegin(metadata);
}

Expand Down Expand Up @@ -282,11 +294,10 @@ void ConnectionManager::ActiveRpc::sendLocalReply(const DirectResponse& response
parent_.doDeferredRpcDestroy(*this);
}

void ConnectionManager::ActiveRpc::startUpstreamResponse(TransportType transport_type,
ProtocolType protocol_type) {
void ConnectionManager::ActiveRpc::startUpstreamResponse(Transport& transport, Protocol& protocol) {
ASSERT(response_decoder_ == nullptr);

response_decoder_ = std::make_unique<ResponseDecoder>(*this, transport_type, protocol_type);
response_decoder_ = std::make_unique<ResponseDecoder>(*this, transport, protocol);
}

bool ConnectionManager::ActiveRpc::upstreamData(Buffer::Instance& buffer) {
Expand Down
22 changes: 9 additions & 13 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class Config {

virtual ThriftFilters::FilterChainFactory& filterFactory() PURE;
virtual ThriftFilterStats& stats() PURE;
virtual DecoderPtr createDecoder(DecoderCallbacks& callbacks) PURE;
virtual TransportPtr createTransport() PURE;
virtual ProtocolPtr createProtocol() PURE;
virtual Router::Config& routerConfig() PURE;
};

Expand Down Expand Up @@ -74,18 +75,10 @@ class ConnectionManager : public Network::ReadFilter,
struct ActiveRpc;

struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter {
ResponseDecoder(ActiveRpc& parent, TransportType transport_type, ProtocolType protocol_type)
: parent_(parent),
decoder_(std::make_unique<Decoder>(
NamedTransportConfigFactory::getFactory(transport_type).createTransport(),
NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol(), *this)),
ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol)
: parent_(parent), decoder_(std::make_unique<Decoder>(transport, protocol, *this)),
complete_(false), first_reply_field_(false) {
// Use the factory to get the concrete protocol from the decoder protocol (as opposed to
// potentially pre-detection auto protocol).
initProtocolConverter(
NamedProtocolConfigFactory::getFactory(parent_.parent_.decoder_->protocolType())
.createProtocol(),
parent_.response_buffer_);
initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_);
}

bool onData(Buffer::Instance& data);
Expand Down Expand Up @@ -149,7 +142,7 @@ class ConnectionManager : public Network::ReadFilter,
return parent_.decoder_->protocolType();
}
void sendLocalReply(const DirectResponse& response) override;
void startUpstreamResponse(TransportType transport_type, ProtocolType protocol_type) override;
void startUpstreamResponse(Transport& transport, Protocol& protocol) override;
bool upstreamData(Buffer::Instance& buffer) override;
void resetDownstreamConnection() override;

Expand All @@ -170,6 +163,7 @@ class ConnectionManager : public Network::ReadFilter,
uint64_t stream_id_;
MessageMetadataSharedPtr metadata_;
ThriftFilters::DecoderFilterSharedPtr decoder_filter_;
DecoderEventHandlerSharedPtr upgrade_handler_;
ResponseDecoderPtr response_decoder_;
absl::optional<Router::RouteConstSharedPtr> cached_route_;
Buffer::OwnedImpl response_buffer_;
Expand All @@ -188,6 +182,8 @@ class ConnectionManager : public Network::ReadFilter,

Network::ReadFilterCallbacks* read_callbacks_{};

TransportPtr transport_;
ProtocolPtr protocol_;
DecoderPtr decoder_;
std::list<ActiveRpcPtr> rpcs_;
Buffer::OwnedImpl request_buffer_;
Expand Down
48 changes: 48 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include "envoy/tcp/conn_pool.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace ThriftProxy {

/**
* ThriftConnectionState tracks thrift-related connection state for pooled connections.
*/
class ThriftConnectionState : public Tcp::ConnectionPool::ConnectionState {
public:
/**
* @return true if this upgrade has been attempted on this connection.
*/
bool upgradeAttempted() const { return upgrade_attempted_; }
/**
* @return true if this connection has been upgraded
*/
bool isUpgraded() const { return upgraded_; }

/**
* Marks the connection as successfully upgraded.
*/
void markUpgraded() {
upgrade_attempted_ = true;
upgraded_ = true;
}

/**
* Marks the connection as not upgraded.
*/
void markUpgradeFailed() {
upgrade_attempted_ = true;
upgraded_ = false;
}

private:
bool upgrade_attempted_{false};
bool upgraded_{false};
};

} // namespace ThriftProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
35 changes: 19 additions & 16 deletions source/extensions/filters/network/thrift_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ ProtocolState DecoderStateMachine::popReturnState() {

ProtocolState DecoderStateMachine::run(Buffer::Instance& buffer) {
while (state_ != ProtocolState::Done) {
ENVOY_LOG(trace, "thrift: state {}, {} bytes available", ProtocolStateNameValues::name(state_),
buffer.length());

DecoderStatus s = handleState(buffer);
if (s.next_state_ == ProtocolState::WaitForData) {
return ProtocolState::WaitForData;
Expand All @@ -350,8 +353,8 @@ ProtocolState DecoderStateMachine::run(Buffer::Instance& buffer) {
return state_;
}

Decoder::Decoder(TransportPtr&& transport, ProtocolPtr&& protocol, DecoderCallbacks& callbacks)
: transport_(std::move(transport)), protocol_(std::move(protocol)), callbacks_(callbacks) {}
Decoder::Decoder(Transport& transport, Protocol& protocol, DecoderCallbacks& callbacks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change from TransportPtr&& transport to Transport& transport, what guarantees that the referenced transport will remain valid for the full life of the Decoder object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to the constructor of the Decoder to make sure this is the case. ConnectionManager creates a TransportPtr, ProtocolPtr, and DecoderPtr when it's constructed. The TransportPtr and ProtocolPtr and never replaced so their lifecycle is the duration of the downstream connection.

Each Router has a TransportPtr and ProtocolPtr. A DecoderPtr is held indirectly via ThriftObjectPtr if protocol upgrade occurs for the upstream. Once upgrade is complete that ThriftObjectPtr and DecoderPtr are destroyed. If the request is terminated early the ThriftObjectPtr and DecoderPtr are destroyed before the TransportPtr or ProtocolPtr.

Finally the ConnectionManager::ActiveRpc holds a ResponseDecoder that has references to the Router's TransportPtr and ProtocolPtr. The destruction order there is ResponseDecoder before DecoderFilter so that's ok as well.

: transport_(transport), protocol_(protocol), callbacks_(callbacks) {}

void Decoder::complete() {
request_.reset();
Expand All @@ -377,22 +380,22 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {
metadata_ = std::make_shared<MessageMetadata>();
}

if (!transport_->decodeFrameStart(data, *metadata_)) {
ENVOY_LOG(debug, "thrift: need more data for {} transport start", transport_->name());
if (!transport_.decodeFrameStart(data, *metadata_)) {
ENVOY_LOG(debug, "thrift: need more data for {} transport start", transport_.name());
buffer_underflow = true;
return FilterStatus::Continue;
}
ENVOY_LOG(debug, "thrift: {} transport started", transport_->name());
ENVOY_LOG(debug, "thrift: {} transport started", transport_.name());

if (metadata_->hasProtocol()) {
if (protocol_->type() == ProtocolType::Auto) {
protocol_->setType(metadata_->protocol());
ENVOY_LOG(debug, "thrift: {} transport forced {} protocol", transport_->name(),
protocol_->name());
} else if (metadata_->protocol() != protocol_->type()) {
if (protocol_.type() == ProtocolType::Auto) {
protocol_.setType(metadata_->protocol());
ENVOY_LOG(debug, "thrift: {} transport forced {} protocol", transport_.name(),
protocol_.name());
} else if (metadata_->protocol() != protocol_.type()) {
throw EnvoyException(fmt::format("transport reports protocol {}, but configured for {}",
ProtocolNames::get().fromType(metadata_->protocol()),
ProtocolNames::get().fromType(protocol_->type())));
ProtocolNames::get().fromType(protocol_.type())));
}
}
if (metadata_->hasAppException()) {
Expand All @@ -406,7 +409,7 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {
request_ = std::make_unique<ActiveRequest>(callbacks_.newDecoderEventHandler());
frame_started_ = true;
state_machine_ =
std::make_unique<DecoderStateMachine>(*protocol_, metadata_, request_->handler_);
std::make_unique<DecoderStateMachine>(protocol_, metadata_, request_->handler_);

if (request_->handler_.transportBegin(metadata_) == FilterStatus::StopIteration) {
return FilterStatus::StopIteration;
Expand All @@ -415,7 +418,7 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {

ASSERT(state_machine_ != nullptr);

ENVOY_LOG(debug, "thrift: protocol {}, state {}, {} bytes available", protocol_->name(),
ENVOY_LOG(debug, "thrift: protocol {}, state {}, {} bytes available", protocol_.name(),
ProtocolStateNameValues::name(state_machine_->currentState()), data.length());

ProtocolState rv = state_machine_->run(data);
Expand All @@ -431,16 +434,16 @@ FilterStatus Decoder::onData(Buffer::Instance& data, bool& buffer_underflow) {
ASSERT(rv == ProtocolState::Done);

// Message complete, decode end of frame.
if (!transport_->decodeFrameEnd(data)) {
ENVOY_LOG(debug, "thrift: need more data for {} transport end", transport_->name());
if (!transport_.decodeFrameEnd(data)) {
ENVOY_LOG(debug, "thrift: need more data for {} transport end", transport_.name());
buffer_underflow = true;
return FilterStatus::Continue;
}

frame_ended_ = true;
metadata_.reset();

ENVOY_LOG(debug, "thrift: {} transport ended", transport_->name());
ENVOY_LOG(debug, "thrift: {} transport ended", transport_.name());
if (request_->handler_.transportEnd() == FilterStatus::StopIteration) {
return FilterStatus::StopIteration;
}
Expand Down
Loading