Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 142 additions & 6 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,11 @@ struct PerConnection {
FakeRawConnectionPtr upstream_conn_;
};

class ListenerFilterIntegrationTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
class ListenerFilterIntegrationTest : public BaseIntegrationTest,
public Grpc::GrpcClientIntegrationParamTest {
public:
ListenerFilterIntegrationTest()
: BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF(
: BaseIntegrationTest(ipVersion(), ConfigHelper::baseConfig() + R"EOF(
filter_chains:
- filters:
- name: envoy.filters.network.tcp_proxy
Expand All @@ -738,6 +738,51 @@ class ListenerFilterIntegrationTest : public testing::TestWithParam<Network::Add
stat_prefix: tcp_stats
cluster: cluster_0
)EOF") {}

void createLdsStream() {
AssertionResult result =
fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, lds_connection_);
EXPECT_TRUE(result);

auto result2 = lds_connection_->waitForNewStream(*dispatcher_, lds_stream_);
EXPECT_TRUE(result2);
lds_stream_->startGrpcStream();
}

void sendLdsResponse(const std::vector<envoy::config::listener::v3::Listener>& listener_configs,
const std::string& version) {
envoy::service::discovery::v3::DiscoveryResponse response;
response.set_version_info(version);
response.set_type_url(Config::TypeUrl::get().Listener);
for (const auto& listener_config : listener_configs) {
response.add_resources()->PackFrom(listener_config);
}
ASSERT_NE(nullptr, lds_stream_);
lds_stream_->sendGrpcMessage(response);
}

void sendLdsResponse(const std::vector<std::string>& listener_configs,
const std::string& version) {
std::vector<envoy::config::listener::v3::Listener> proto_configs;
proto_configs.reserve(listener_configs.size());
for (const auto& listener_blob : listener_configs) {
proto_configs.emplace_back(
TestUtility::parseYaml<envoy::config::listener::v3::Listener>(listener_blob));
}
sendLdsResponse(proto_configs, version);
}

void createUpstreams() override {
BaseIntegrationTest::createUpstreams();
if (!use_lds_) {
// Create the LDS upstream (fake_upstreams_[1]).
addFakeUpstream(Http::CodecType::HTTP2);
}
}

envoy::config::listener::v3::Listener listener_config_;
FakeHttpConnectionPtr lds_connection_;
FakeStreamPtr lds_stream_{};
};

TEST_P(ListenerFilterIntegrationTest, InspectDataFilterDrainData) {
Expand Down Expand Up @@ -902,9 +947,100 @@ TEST_P(ListenerFilterIntegrationTest, MixNoInspectDataFilterAndInspectDataFilter
tcp_client->close();
}

INSTANTIATE_TEST_SUITE_P(IpVersions, ListenerFilterIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);
// Only update the order of listener filters, ensure the listener filters
// was update.
TEST_P(ListenerFilterIntegrationTest, UpdateListenerFilterOrder) {
// Add two listener filters. The first filter will peek 5 bytes data,
// the second filter will drain 2 bytes data. Expect the upstream will
// receive 3 bytes data.
config_helper_.addListenerFilter(R"EOF(
name: inspect_data1
typed_config:
"@type": type.googleapis.com/test.integration.filters.InspectDataListenerFilterConfig
max_read_bytes: 2
close_connection: false
drain: true
)EOF");
config_helper_.addListenerFilter(R"EOF(
name: inspect_data2
typed_config:
"@type": type.googleapis.com/test.integration.filters.InspectDataListenerFilterConfig
max_read_bytes: 5
close_connection: false
)EOF");
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
// Add the static cluster to serve LDS.
auto* lds_cluster = bootstrap.mutable_static_resources()->add_clusters();
lds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]);
lds_cluster->set_name("lds_cluster");
ConfigHelper::setHttp2(*lds_cluster);
});
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
listener_config_.Swap(bootstrap.mutable_static_resources()->mutable_listeners(0));
listener_config_.set_name("test_listener");
listener_config_.set_continue_on_listener_filters_timeout(false);
listener_config_.mutable_listener_filters_timeout()->MergeFrom(
ProtobufUtil::TimeUtil::MillisecondsToDuration(1000));
ENVOY_LOG_MISC(debug, "listener config: {}", listener_config_.DebugString());
bootstrap.mutable_static_resources()->mutable_listeners()->Clear();
auto* lds_config_source = bootstrap.mutable_dynamic_resources()->mutable_lds_config();
lds_config_source->set_resource_api_version(envoy::config::core::v3::ApiVersion::V3);
auto* lds_api_config_source = lds_config_source->mutable_api_config_source();
lds_api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC);
lds_api_config_source->set_transport_api_version(envoy::config::core::v3::V3);
envoy::config::core::v3::GrpcService* grpc_service = lds_api_config_source->add_grpc_services();
setGrpcService(*grpc_service, "lds_cluster", fake_upstreams_[1]->localAddress());
});
on_server_init_function_ = [&]() {
createLdsStream();
sendLdsResponse({MessageUtil::getYamlStringFromMessage(listener_config_)}, "1");
};
use_lds_ = false;
initialize();
test_server_->waitForCounterGe("listener_manager.lds.update_success", 1);
test_server_->waitUntilListenersReady();
// NOTE: The line above doesn't tell you if listener is up and listening.
test_server_->waitForCounterGe("listener_manager.listener_create_success", 1);
// Workers not started, the LDS added test_listener is in active_listeners_ list.
EXPECT_EQ(test_server_->server().listenerManager().listeners().size(), 1);
registerTestServerPorts({"test_listener"});

std::string data = "hello";
std::string data_after_drain = data.substr(2, std::string::npos);
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("test_listener"));
ASSERT_TRUE(tcp_client->write(data));
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(fake_upstream_connection->waitForData(data.size() - 2, &data_after_drain));
tcp_client->close();

// Switch the order of two listener filters. The first filter will drain 2 bytes data.
// Then the second filter expects 5 bytes data, since the client only send 5 bytes data
// and 2 bytes was drained. Then only 3 bytes data available. In the end, the listener
// filter will timeout.
listener_config_.mutable_listener_filters()->SwapElements(0, 1);
ENVOY_LOG_MISC(debug, "listener config: {}", listener_config_.DebugString());
sendLdsResponse({MessageUtil::getYamlStringFromMessage(listener_config_)}, "2");
test_server_->waitForCounterGe("listener_manager.listener_create_success", 2);
test_server_->waitForCounterEq("listener_manager.listener_in_place_updated", 1);

IntegrationTcpClientPtr tcp_client2 = makeTcpConnection(lookupPort("test_listener"));
ASSERT_TRUE(tcp_client2->write(data));
tcp_client2->waitForDisconnect();

// Then ensure the whole listener works as expect with enough data.
std::string long_data = "helloworld";
std::string long_data_after_drain = long_data.substr(2, std::string::npos);
IntegrationTcpClientPtr tcp_client3 = makeTcpConnection(lookupPort("test_listener"));
ASSERT_TRUE(tcp_client3->write(long_data));
FakeRawConnectionPtr fake_upstream_connection2;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection2));
ASSERT_TRUE(fake_upstream_connection2->waitForData(long_data.size() - 2, &long_data_after_drain));
tcp_client3->close();
}

INSTANTIATE_TEST_SUITE_P(IpVersionsAndGrpcTypes, ListenerFilterIntegrationTest,
GRPC_CLIENT_INTEGRATION_PARAMS);

class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
Expand Down