diff --git a/test/integration/listener_lds_integration_test.cc b/test/integration/listener_lds_integration_test.cc index 007170da81157..7226abde41ba1 100644 --- a/test/integration/listener_lds_integration_test.cc +++ b/test/integration/listener_lds_integration_test.cc @@ -725,11 +725,11 @@ struct PerConnection { FakeRawConnectionPtr upstream_conn_; }; -class ListenerFilterIntegrationTest : public testing::TestWithParam, - public BaseIntegrationTest { +class ListenerFilterIntegrationTest : public BaseIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { public: ListenerFilterIntegrationTest() - : BaseIntegrationTest(GetParam(), ConfigHelper::baseConfig() + R"EOF( + : BaseIntegrationTest(ipVersion(), ConfigHelper::baseConfig() + R"EOF( filter_chains: - filters: - name: envoy.filters.network.tcp_proxy @@ -738,6 +738,51 @@ class ListenerFilterIntegrationTest : public testing::TestWithParamwaitForHttpConnection(*dispatcher_, lds_connection_); + EXPECT_TRUE(result); + + auto result2 = lds_connection_->waitForNewStream(*dispatcher_, lds_stream_); + EXPECT_TRUE(result2); + lds_stream_->startGrpcStream(); + } + + void sendLdsResponse(const std::vector& listener_configs, + const std::string& version) { + envoy::service::discovery::v3::DiscoveryResponse response; + response.set_version_info(version); + response.set_type_url(Config::TypeUrl::get().Listener); + for (const auto& listener_config : listener_configs) { + response.add_resources()->PackFrom(listener_config); + } + ASSERT_NE(nullptr, lds_stream_); + lds_stream_->sendGrpcMessage(response); + } + + void sendLdsResponse(const std::vector& listener_configs, + const std::string& version) { + std::vector proto_configs; + proto_configs.reserve(listener_configs.size()); + for (const auto& listener_blob : listener_configs) { + proto_configs.emplace_back( + TestUtility::parseYaml(listener_blob)); + } + sendLdsResponse(proto_configs, version); + } + + void createUpstreams() override { + BaseIntegrationTest::createUpstreams(); + if (!use_lds_) { + // Create the LDS upstream (fake_upstreams_[1]). + addFakeUpstream(Http::CodecType::HTTP2); + } + } + + envoy::config::listener::v3::Listener listener_config_; + FakeHttpConnectionPtr lds_connection_; + FakeStreamPtr lds_stream_{}; }; TEST_P(ListenerFilterIntegrationTest, InspectDataFilterDrainData) { @@ -902,9 +947,100 @@ TEST_P(ListenerFilterIntegrationTest, MixNoInspectDataFilterAndInspectDataFilter tcp_client->close(); } -INSTANTIATE_TEST_SUITE_P(IpVersions, ListenerFilterIntegrationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); +// Only update the order of listener filters, ensure the listener filters +// was update. +TEST_P(ListenerFilterIntegrationTest, UpdateListenerFilterOrder) { + // Add two listener filters. The first filter will peek 5 bytes data, + // the second filter will drain 2 bytes data. Expect the upstream will + // receive 3 bytes data. + config_helper_.addListenerFilter(R"EOF( + name: inspect_data1 + typed_config: + "@type": type.googleapis.com/test.integration.filters.InspectDataListenerFilterConfig + max_read_bytes: 2 + close_connection: false + drain: true + )EOF"); + config_helper_.addListenerFilter(R"EOF( + name: inspect_data2 + typed_config: + "@type": type.googleapis.com/test.integration.filters.InspectDataListenerFilterConfig + max_read_bytes: 5 + close_connection: false + )EOF"); + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Add the static cluster to serve LDS. + auto* lds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + lds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + lds_cluster->set_name("lds_cluster"); + ConfigHelper::setHttp2(*lds_cluster); + }); + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + listener_config_.Swap(bootstrap.mutable_static_resources()->mutable_listeners(0)); + listener_config_.set_name("test_listener"); + listener_config_.set_continue_on_listener_filters_timeout(false); + listener_config_.mutable_listener_filters_timeout()->MergeFrom( + ProtobufUtil::TimeUtil::MillisecondsToDuration(1000)); + ENVOY_LOG_MISC(debug, "listener config: {}", listener_config_.DebugString()); + bootstrap.mutable_static_resources()->mutable_listeners()->Clear(); + auto* lds_config_source = bootstrap.mutable_dynamic_resources()->mutable_lds_config(); + lds_config_source->set_resource_api_version(envoy::config::core::v3::ApiVersion::V3); + auto* lds_api_config_source = lds_config_source->mutable_api_config_source(); + lds_api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + lds_api_config_source->set_transport_api_version(envoy::config::core::v3::V3); + envoy::config::core::v3::GrpcService* grpc_service = lds_api_config_source->add_grpc_services(); + setGrpcService(*grpc_service, "lds_cluster", fake_upstreams_[1]->localAddress()); + }); + on_server_init_function_ = [&]() { + createLdsStream(); + sendLdsResponse({MessageUtil::getYamlStringFromMessage(listener_config_)}, "1"); + }; + use_lds_ = false; + initialize(); + test_server_->waitForCounterGe("listener_manager.lds.update_success", 1); + test_server_->waitUntilListenersReady(); + // NOTE: The line above doesn't tell you if listener is up and listening. + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + // Workers not started, the LDS added test_listener is in active_listeners_ list. + EXPECT_EQ(test_server_->server().listenerManager().listeners().size(), 1); + registerTestServerPorts({"test_listener"}); + + std::string data = "hello"; + std::string data_after_drain = data.substr(2, std::string::npos); + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("test_listener")); + ASSERT_TRUE(tcp_client->write(data)); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForData(data.size() - 2, &data_after_drain)); + tcp_client->close(); + + // Switch the order of two listener filters. The first filter will drain 2 bytes data. + // Then the second filter expects 5 bytes data, since the client only send 5 bytes data + // and 2 bytes was drained. Then only 3 bytes data available. In the end, the listener + // filter will timeout. + listener_config_.mutable_listener_filters()->SwapElements(0, 1); + ENVOY_LOG_MISC(debug, "listener config: {}", listener_config_.DebugString()); + sendLdsResponse({MessageUtil::getYamlStringFromMessage(listener_config_)}, "2"); + test_server_->waitForCounterGe("listener_manager.listener_create_success", 2); + test_server_->waitForCounterEq("listener_manager.listener_in_place_updated", 1); + + IntegrationTcpClientPtr tcp_client2 = makeTcpConnection(lookupPort("test_listener")); + ASSERT_TRUE(tcp_client2->write(data)); + tcp_client2->waitForDisconnect(); + + // Then ensure the whole listener works as expect with enough data. + std::string long_data = "helloworld"; + std::string long_data_after_drain = long_data.substr(2, std::string::npos); + IntegrationTcpClientPtr tcp_client3 = makeTcpConnection(lookupPort("test_listener")); + ASSERT_TRUE(tcp_client3->write(long_data)); + FakeRawConnectionPtr fake_upstream_connection2; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection2)); + ASSERT_TRUE(fake_upstream_connection2->waitForData(long_data.size() - 2, &long_data_after_drain)); + tcp_client3->close(); +} + +INSTANTIATE_TEST_SUITE_P(IpVersionsAndGrpcTypes, ListenerFilterIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); class RebalancerTest : public testing::TestWithParam, public BaseIntegrationTest {