Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ ConnectivityGrid::WrapperCallbacks::WrapperCallbacks(ConnectivityGrid& grid,
: grid_(grid), decoder_(decoder), inner_callbacks_(callbacks),
next_attempt_timer_(
grid_.dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })),
current_(pool_it) {
newStream();
}
current_(pool_it) {}

// TODO(#15649) add trace logging.
ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::ConnectionAttemptCallbacks(
WrapperCallbacks& parent, PoolIterator it)
: parent_(parent), pool_it_(it), cancellable_(pool().newStream(parent_.decoder_, *this)) {}
: parent_(parent), pool_it_(it), cancellable_(nullptr) {}

bool ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::newStream() {
auto* cancellable = pool().newStream(parent_.decoder_, *this);
if (cancellable == nullptr) {
return true;
}
cancellable_ = cancellable;
return false;
}

void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolFailure(
ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason,
Expand Down Expand Up @@ -64,14 +71,16 @@ void ConnectivityGrid::WrapperCallbacks::deleteThis() {
removeFromList(grid_.wrapped_callbacks_);
}

void ConnectivityGrid::WrapperCallbacks::newStream() {
bool ConnectivityGrid::WrapperCallbacks::newStream() {
ENVOY_LOG(trace, "{} pool attempting to create a new stream to host '{}'.",
describePool(**current_), grid_.host_->hostname());
auto attempt = std::make_unique<ConnectionAttemptCallbacks>(*this, current_);
LinkedList::moveIntoList(std::move(attempt), connection_attempts_);
if (!next_attempt_timer_->enabled()) {
next_attempt_timer_->enableTimer(grid_.next_attempt_duration_);
}
// Note that in the case of immediate attempt/failure, newStream will delete this.
return connection_attempts_.front()->newStream();
}

void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolReady(
Expand Down Expand Up @@ -175,6 +184,12 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
std::make_unique<WrapperCallbacks>(*this, decoder, pools_.begin(), callbacks);
ConnectionPool::Cancellable* ret = wrapped_callback.get();
LinkedList::moveIntoList(std::move(wrapped_callback), wrapped_callbacks_);
// Note that in the case of immediate attempt/failure, newStream will delete this.
if (wrapped_callbacks_.front()->newStream()) {
// If newStream succeeds, return nullptr as the caller has received their
// callback and does not need a cancellable handle.
return nullptr;
}
return ret;
}

Expand Down
12 changes: 8 additions & 4 deletions source/common/http/conn_pool_grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class ConnectivityGrid : public ConnectionPool::Instance,
public:
ConnectionAttemptCallbacks(WrapperCallbacks& parent, PoolIterator it);

// Returns true if a stream is immediately created, false if it is pending.
bool newStream();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more readable to have an enum instead of a bool here. But you can leave it like this if you like it better. I got confused trying to read the code before seeing this comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - will do that in a follow-up!


// ConnectionPool::Callbacks
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Expand All @@ -61,8 +64,9 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// ConnectionPool::Cancellable
void cancel(Envoy::ConnectionPool::CancelPolicy cancel_policy) override;

// Attempt to create a new stream for pool();
void newStream();
// Attempt to create a new stream for pool(). Returns true if the stream has
// been created.
bool newStream();

// Removes this from the owning list, deleting it.
void deleteThis();
Expand Down Expand Up @@ -128,8 +132,8 @@ class ConnectivityGrid : public ConnectionPool::Instance,
Random::RandomGenerator& random_generator_;
Upstream::HostConstSharedPtr host_;
Upstream::ResourcePriority priority_;
const Network::ConnectionSocket::OptionsSharedPtr& options_;
const Network::TransportSocketOptionsSharedPtr& transport_socket_options_;
const Network::ConnectionSocket::OptionsSharedPtr options_;
const Network::TransportSocketOptionsSharedPtr transport_socket_options_;
Upstream::ClusterConnectivityState& state_;
std::chrono::milliseconds next_attempt_duration_;
TimeSource& time_source_;
Expand Down
40 changes: 36 additions & 4 deletions test/common/http/conn_pool_grid_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@ class ConnectivityGridForTest : public ConnectivityGrid {
.WillByDefault(
Invoke([&](Http::ResponseDecoder&,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks_.push_back(&callbacks);
return &cancel_;
if (immediate_success_) {
callbacks.onPoolReady(*encoder_, host(), *info_, absl::nullopt);
return nullptr;
} else if (immediate_failure_) {
callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::LocalConnectionFailure,
"reason", host());
return nullptr;
} else {
callbacks_.push_back(&callbacks);
return &cancel_;
}
}));
if (pools_.size() == 1) {
EXPECT_CALL(*first(), protocolDescription())
Expand Down Expand Up @@ -74,9 +83,13 @@ class ConnectivityGridForTest : public ConnectivityGrid {

ConnectionPool::Callbacks* callbacks(int index = 0) { return callbacks_[index]; }

StreamInfo::MockStreamInfo* info_;
NiceMock<MockRequestEncoder>* encoder_;
void setDestroying() { destroying_ = true; }
std::vector<ConnectionPool::Callbacks*> callbacks_;
NiceMock<Envoy::ConnectionPool::MockCancellable> cancel_;
bool immediate_success_{};
bool immediate_failure_{};
};

namespace {
Expand All @@ -88,7 +101,10 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin
grid_(dispatcher_, random_,
Upstream::makeTestHost(cluster_, "hostname", "tcp://127.0.0.1:9000", simTime()),
Upstream::ResourcePriority::Default, socket_options_, transport_socket_options_,
state_, simTime(), std::chrono::milliseconds(300), options_) {}
state_, simTime(), std::chrono::milliseconds(300), options_) {
grid_.info_ = &info_;
grid_.encoder_ = &encoder_;
}

const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
const Network::TransportSocketOptionsSharedPtr transport_socket_options_;
Expand All @@ -111,7 +127,7 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin
TEST_F(ConnectivityGridTest, Success) {
EXPECT_EQ(grid_.first(), nullptr);

grid_.newStream(decoder_, callbacks_);
EXPECT_NE(grid_.newStream(decoder_, callbacks_), nullptr);
EXPECT_NE(grid_.first(), nullptr);

// onPoolReady should be passed from the pool back to the original caller.
Expand All @@ -120,6 +136,15 @@ TEST_F(ConnectivityGridTest, Success) {
grid_.callbacks()->onPoolReady(encoder_, host_, info_, absl::nullopt);
}

// Test the first pool successfully connecting under the stack of newStream.
TEST_F(ConnectivityGridTest, ImmediateSuccess) {
grid_.immediate_success_ = true;

EXPECT_CALL(callbacks_.pool_ready_, ready());
EXPECT_EQ(grid_.newStream(decoder_, callbacks_), nullptr);
EXPECT_NE(grid_.first(), nullptr);
}

// Test the first pool failing and the second connecting.
TEST_F(ConnectivityGridTest, FailureThenSuccessSerial) {
EXPECT_EQ(grid_.first(), nullptr);
Expand Down Expand Up @@ -230,6 +255,13 @@ TEST_F(ConnectivityGridTest, FailureThenSuccessForMultipleConnectionsSerial) {
cancel2->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
}

// Test double failure under the stack of newStream.
TEST_F(ConnectivityGridTest, ImmediateDoubleFailure) {
grid_.immediate_failure_ = true;
EXPECT_CALL(callbacks_.pool_failure_, ready());
EXPECT_EQ(grid_.newStream(decoder_, callbacks_), nullptr);
}

// Test both connections happening in parallel and both failing.
TEST_F(ConnectivityGridTest, TimeoutDoubleFailureParallel) {
EXPECT_EQ(grid_.first(), nullptr);
Expand Down