diff --git a/api/envoy/config/tap/v3/common.proto b/api/envoy/config/tap/v3/common.proto index 1884bd57d3d17..126993d0f7b42 100644 --- a/api/envoy/config/tap/v3/common.proto +++ b/api/envoy/config/tap/v3/common.proto @@ -4,6 +4,7 @@ package envoy.config.tap.v3; import "envoy/config/common/matcher/v3/matcher.proto"; import "envoy/config/core/v3/base.proto"; +import "envoy/config/core/v3/extension.proto"; import "envoy/config/core/v3/grpc_service.proto"; import "envoy/config/route/v3/route_components.proto"; @@ -183,7 +184,7 @@ message OutputConfig { } // Tap output sink configuration. -// [#next-free-field: 6] +// [#next-free-field: 7] message OutputSink { option (udpa.annotations.versioning).previous_message_type = "envoy.service.tap.v2alpha.OutputSink"; @@ -259,6 +260,9 @@ message OutputSink { // been configured to receive tap configuration from some other source (e.g., static // file, XDS, etc.) configuring the buffered admin output type will fail. BufferedAdminSink buffered_admin = 5; + + // Tap output filter will be defined by an extension type + core.v3.TypedExtensionConfig custom_sink = 6; } } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 7a71d5669308f..d0b8ca262f429 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -31,5 +31,9 @@ removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` new_features: +- area: tap + change: | + added :ref:`custom_sink ` type to enable writing tap data + out to a custom sink extension. deprecated: diff --git a/source/extensions/common/tap/BUILD b/source/extensions/common/tap/BUILD index 452fa65c61611..68e8617581a86 100644 --- a/source/extensions/common/tap/BUILD +++ b/source/extensions/common/tap/BUILD @@ -28,6 +28,7 @@ envoy_cc_library( ":tap_interface", "//source/common/common:assert_lib", "//source/common/common:hex_lib", + "//source/common/config:utility_lib", "//source/extensions/common/matcher:matcher_lib", "@envoy_api//envoy/config/tap/v3:pkg_cc_proto", "@envoy_api//envoy/data/tap/v3:pkg_cc_proto", diff --git a/source/extensions/common/tap/tap.h b/source/extensions/common/tap/tap.h index 107152f9d08bc..9f3a86966eab2 100644 --- a/source/extensions/common/tap/tap.h +++ b/source/extensions/common/tap/tap.h @@ -76,6 +76,27 @@ class Sink { }; using SinkPtr = std::unique_ptr; +using SinkContext = + absl::variant, + std::reference_wrapper>; + +/** + * Abstract tap sink factory. Produces a factory that can instantiate SinkPtr objects + */ +class TapSinkFactory : public Config::TypedFactory { +public: + ~TapSinkFactory() override = default; + std::string category() const override { return "envoy.tap.sinks"; } + + /** + * Create a Sink that can be used for writing out data produced by the tap filter. + * @param config supplies the protobuf configuration for the sink factory + * @param cluster_manager is a ClusterManager from the HTTP/transport socket context + */ + virtual SinkPtr createSinkPtr(const Protobuf::Message& config, SinkContext context) PURE; +}; + +using TapSinkFactoryPtr = std::unique_ptr; /** * Generic configuration for a tap extension (filter, transport socket, etc.). diff --git a/source/extensions/common/tap/tap_config_base.cc b/source/extensions/common/tap/tap_config_base.cc index 5d91f81bf9279..7f75f4636f777 100644 --- a/source/extensions/common/tap/tap_config_base.cc +++ b/source/extensions/common/tap/tap_config_base.cc @@ -3,9 +3,11 @@ #include "envoy/config/tap/v3/common.pb.h" #include "envoy/data/tap/v3/common.pb.h" #include "envoy/data/tap/v3/wrapper.pb.h" +#include "envoy/server/transport_socket_config.h" #include "source/common/common/assert.h" #include "source/common/common/fmt.h" +#include "source/common/config/utility.h" #include "source/common/protobuf/utility.h" #include "source/extensions/common/matcher/matcher.h" @@ -45,12 +47,13 @@ bool Utility::addBufferToProtoBytes(envoy::data::tap::v3::Body& output_body, } TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer) + Common::Tap::Sink* admin_streamer, SinkContext context) : max_buffered_rx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( proto_config.output_config(), max_buffered_rx_bytes, DefaultMaxBufferedBytes)), max_buffered_tx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( proto_config.output_config(), max_buffered_tx_bytes, DefaultMaxBufferedBytes)), streaming_(proto_config.output_config().streaming()) { + using ProtoOutputSink = envoy::config::tap::v3::OutputSink; auto& sinks = proto_config.output_config().sinks(); ASSERT(sinks.size() == 1); @@ -86,6 +89,33 @@ TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& pr sink_ = std::make_unique(sinks[0].file_per_tap()); sink_to_use_ = sink_.get(); break; + case ProtoOutputSink::OutputSinkTypeCase::kCustomSink: { + TapSinkFactory& tap_sink_factory = + Envoy::Config::Utility::getAndCheckFactory(sinks[0].custom_sink()); + + // extract message validation visitor from the context and use it to define config + ProtobufTypes::MessagePtr config; + using TsfContextRef = + std::reference_wrapper; + using HttpContextRef = std::reference_wrapper; + if (absl::holds_alternative(context)) { + Server::Configuration::TransportSocketFactoryContext& tsf_context = + absl::get(context).get(); + config = Config::Utility::translateAnyToFactoryConfig(sinks[0].custom_sink().typed_config(), + tsf_context.messageValidationVisitor(), + tap_sink_factory); + } else { + Server::Configuration::FactoryContext& http_context = + absl::get(context).get(); + config = Config::Utility::translateAnyToFactoryConfig( + sinks[0].custom_sink().typed_config(), + http_context.messageValidationContext().staticValidationVisitor(), tap_sink_factory); + } + + sink_ = tap_sink_factory.createSinkPtr(*config, context); + sink_to_use_ = sink_.get(); + break; + } case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::kStreamingGrpc: PANIC("not implemented"); case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::OUTPUT_SINK_TYPE_NOT_SET: diff --git a/source/extensions/common/tap/tap_config_base.h b/source/extensions/common/tap/tap_config_base.h index 77a997929b745..aca6eb1cf485e 100644 --- a/source/extensions/common/tap/tap_config_base.h +++ b/source/extensions/common/tap/tap_config_base.h @@ -103,7 +103,7 @@ class TapConfigBaseImpl : public virtual TapConfig { protected: TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer); + Common::Tap::Sink* admin_streamer, SinkContext context); private: // This is the default setting for both RX/TX max buffered bytes. (This means that per tap, the diff --git a/source/extensions/filters/http/tap/config.cc b/source/extensions/filters/http/tap/config.cc index 5c573924f768e..c9051d7c68b62 100644 --- a/source/extensions/filters/http/tap/config.cc +++ b/source/extensions/filters/http/tap/config.cc @@ -15,21 +15,27 @@ namespace TapFilter { class HttpTapConfigFactoryImpl : public Extensions::Common::Tap::TapConfigFactory { public: + HttpTapConfigFactoryImpl(Server::Configuration::FactoryContext& context) + : factory_context_(context) {} // TapConfigFactory Extensions::Common::Tap::TapConfigSharedPtr createConfigFromProto(const envoy::config::tap::v3::TapConfig& proto_config, Extensions::Common::Tap::Sink* admin_streamer) override { - return std::make_shared(std::move(proto_config), admin_streamer); + return std::make_shared(std::move(proto_config), admin_streamer, + factory_context_); } + +private: + Server::Configuration::FactoryContext& factory_context_; }; Http::FilterFactoryCb TapFilterFactory::createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::tap::v3::Tap& proto_config, const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { - FilterConfigSharedPtr filter_config( - new FilterConfigImpl(proto_config, stats_prefix, std::make_unique(), - context.scope(), context.admin(), context.singletonManager(), - context.threadLocal(), context.mainThreadDispatcher())); + FilterConfigSharedPtr filter_config(new FilterConfigImpl( + proto_config, stats_prefix, std::make_unique(context), + context.scope(), context.admin(), context.singletonManager(), context.threadLocal(), + context.mainThreadDispatcher())); return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { auto filter = std::make_shared(filter_config); callbacks.addStreamFilter(filter); diff --git a/source/extensions/filters/http/tap/tap_config_impl.cc b/source/extensions/filters/http/tap/tap_config_impl.cc index b3d844c7a48e1..2aa2653a4ffe8 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.cc +++ b/source/extensions/filters/http/tap/tap_config_impl.cc @@ -29,8 +29,9 @@ fillHeaderList(Protobuf::RepeatedPtrField* } // namespace HttpTapConfigImpl::HttpTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer) - : TapCommon::TapConfigBaseImpl(std::move(proto_config), admin_streamer) {} + Common::Tap::Sink* admin_streamer, + Server::Configuration::FactoryContext& context) + : TapCommon::TapConfigBaseImpl(std::move(proto_config), admin_streamer, context) {} HttpPerRequestTapperPtr HttpTapConfigImpl::createPerRequestTapper(uint64_t stream_id) { return std::make_unique(shared_from_this(), stream_id); diff --git a/source/extensions/filters/http/tap/tap_config_impl.h b/source/extensions/filters/http/tap/tap_config_impl.h index d79ef4cc4842a..5938d38076fc7 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.h +++ b/source/extensions/filters/http/tap/tap_config_impl.h @@ -19,7 +19,8 @@ class HttpTapConfigImpl : public Extensions::Common::Tap::TapConfigBaseImpl, public std::enable_shared_from_this { public: HttpTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Extensions::Common::Tap::Sink* admin_streamer); + Extensions::Common::Tap::Sink* admin_streamer, + Server::Configuration::FactoryContext& context); // TapFilter::HttpTapConfig HttpPerRequestTapperPtr createPerRequestTapper(uint64_t stream_id) override; diff --git a/source/extensions/transport_sockets/tap/config.cc b/source/extensions/transport_sockets/tap/config.cc index 3b565bee92e96..c8267771e854a 100644 --- a/source/extensions/transport_sockets/tap/config.cc +++ b/source/extensions/transport_sockets/tap/config.cc @@ -17,18 +17,21 @@ namespace Tap { class SocketTapConfigFactoryImpl : public Extensions::Common::Tap::TapConfigFactory { public: - SocketTapConfigFactoryImpl(TimeSource& time_source) : time_source_(time_source) {} + SocketTapConfigFactoryImpl(TimeSource& time_source, + Server::Configuration::TransportSocketFactoryContext& context) + : time_source_(time_source), factory_context_(context) {} // TapConfigFactory Extensions::Common::Tap::TapConfigSharedPtr createConfigFromProto(const envoy::config::tap::v3::TapConfig& proto_config, Extensions::Common::Tap::Sink* admin_streamer) override { return std::make_shared(std::move(proto_config), admin_streamer, - time_source_); + time_source_, factory_context_); } private: TimeSource& time_source_; + Server::Configuration::TransportSocketFactoryContext& factory_context_; }; Network::UpstreamTransportSocketFactoryPtr @@ -49,7 +52,7 @@ UpstreamTapSocketConfigFactory::createTransportSocketFactory( return std::make_unique( outer_config, std::make_unique( - server_context.mainThreadDispatcher().timeSource()), + server_context.mainThreadDispatcher().timeSource(), context), server_context.admin(), server_context.singletonManager(), server_context.threadLocal(), server_context.mainThreadDispatcher(), std::move(inner_transport_factory)); } @@ -72,7 +75,7 @@ DownstreamTapSocketConfigFactory::createTransportSocketFactory( return std::make_unique( outer_config, std::make_unique( - server_context.mainThreadDispatcher().timeSource()), + server_context.mainThreadDispatcher().timeSource(), context), server_context.admin(), server_context.singletonManager(), server_context.threadLocal(), server_context.mainThreadDispatcher(), std::move(inner_transport_factory)); } diff --git a/source/extensions/transport_sockets/tap/tap_config_impl.h b/source/extensions/transport_sockets/tap/tap_config_impl.h index cf715c3df9733..38556025c1408 100644 --- a/source/extensions/transport_sockets/tap/tap_config_impl.h +++ b/source/extensions/transport_sockets/tap/tap_config_impl.h @@ -3,6 +3,7 @@ #include "envoy/config/tap/v3/common.pb.h" #include "envoy/data/tap/v3/transport.pb.h" #include "envoy/event/timer.h" +#include "envoy/server/transport_socket_config.h" #include "source/extensions/common/tap/tap_config_base.h" #include "source/extensions/transport_sockets/tap/tap_config.h" @@ -51,8 +52,10 @@ class SocketTapConfigImpl : public Extensions::Common::Tap::TapConfigBaseImpl, public std::enable_shared_from_this { public: SocketTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Extensions::Common::Tap::Sink* admin_streamer, TimeSource& time_system) - : Extensions::Common::Tap::TapConfigBaseImpl(std::move(proto_config), admin_streamer), + Extensions::Common::Tap::Sink* admin_streamer, TimeSource& time_system, + Server::Configuration::TransportSocketFactoryContext& context) + : Extensions::Common::Tap::TapConfigBaseImpl(std::move(proto_config), admin_streamer, + context), time_source_(time_system) {} // SocketTapConfig diff --git a/test/extensions/common/tap/BUILD b/test/extensions/common/tap/BUILD index f65ad733eaf55..dea02d4f722a2 100644 --- a/test/extensions/common/tap/BUILD +++ b/test/extensions/common/tap/BUILD @@ -26,9 +26,12 @@ envoy_cc_test( srcs = envoy_select_admin_functionality(["admin_test.cc"]), deps = [ "//source/extensions/common/tap:admin", + "//source/extensions/common/tap:tap_config_base", "//test/mocks/server:admin_mocks", "//test/mocks/server:admin_stream_mocks", + "//test/mocks/server:server_mocks", "//test/test_common:logging_lib", + "//test/test_common:registry_lib", "@envoy_api//envoy/config/tap/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/common/tap/admin_test.cc b/test/extensions/common/tap/admin_test.cc index 2a98078939717..f25c56a4340b2 100644 --- a/test/extensions/common/tap/admin_test.cc +++ b/test/extensions/common/tap/admin_test.cc @@ -5,10 +5,13 @@ #include "source/extensions/common/tap/admin.h" #include "source/extensions/common/tap/tap.h" +#include "source/extensions/common/tap/tap_config_base.h" #include "test/mocks/server/admin.h" #include "test/mocks/server/admin_stream.h" +#include "test/mocks/server/mocks.h" #include "test/test_common/logging.h" +#include "test/test_common/registry.h" #include "gtest/gtest.h" @@ -20,6 +23,7 @@ using ::testing::_; using ::testing::AtLeast; using ::testing::Between; using ::testing::DoAll; +using ::testing::Invoke; using ::testing::Return; using ::testing::ReturnRef; using ::testing::SaveArg; @@ -146,6 +150,93 @@ config_id: test_config_id StrictMock sink_; }; +using Extensions::Common::Tap::TapSinkFactory; +class MockTapSinkFactory : public TapSinkFactory { +public: + MockTapSinkFactory() {} + ~MockTapSinkFactory() override{}; + + MOCK_METHOD(SinkPtr, createSinkPtr, (const Protobuf::Message& config, SinkContext), (override)); + + MOCK_METHOD(std::string, name, (), (const, override)); + MOCK_METHOD(ProtobufTypes::MessagePtr, createEmptyConfigProto, (), (override)); +}; + +class TestConfigImpl : public TapConfigBaseImpl { +public: + TestConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, + Extensions::Common::Tap::Sink* admin_streamer, SinkContext context) + : TapConfigBaseImpl(std::move(proto_config), admin_streamer, context) {} +}; + +TEST(TypedExtensionConfigTest, AddTestConfigHttpContext) { + + const std::string tap_config_yaml = + R"EOF( + match: + any_match: true + output_config: + sinks: + - format: PROTO_BINARY + custom_sink: + name: custom_sink + typed_config: + "@type": type.googleapis.cm/google.protobuf.StringValue +)EOF"; + envoy::config::tap::v3::TapConfig tap_config; + TestUtility::loadFromYaml(tap_config_yaml, tap_config); + + MockTapSinkFactory factory_impl; + EXPECT_CALL(factory_impl, name).Times(AtLeast(1)); + EXPECT_CALL(factory_impl, createEmptyConfigProto) + .WillRepeatedly(Invoke([]() -> ProtobufTypes::MessagePtr { + return std::make_unique(); + })); + EXPECT_CALL( + factory_impl, + createSinkPtr( + _, + testing::VariantWith>(_))); + Registry::InjectFactory factory(factory_impl); + + NiceMock factory_context; + TestConfigImpl(tap_config, NULL, factory_context); +} + +TEST(TypedExtensionConfigTest, AddTestConfigTransportSocketContext) { + + const std::string tap_config_yaml = + R"EOF( + match: + any_match: true + output_config: + sinks: + - format: PROTO_BINARY + custom_sink: + name: custom_sink + typed_config: + "@type": type.googleapis.cm/google.protobuf.StringValue +)EOF"; + envoy::config::tap::v3::TapConfig tap_config; + TestUtility::loadFromYaml(tap_config_yaml, tap_config); + + MockTapSinkFactory factory_impl; + EXPECT_CALL(factory_impl, name).Times(AtLeast(1)); + EXPECT_CALL(factory_impl, createEmptyConfigProto) + .WillRepeatedly(Invoke([]() -> ProtobufTypes::MessagePtr { + return std::make_unique(); + })); + EXPECT_CALL( + factory_impl, + createSinkPtr( + _, testing::VariantWith< + std::reference_wrapper>(_))); + Registry::InjectFactory factory(factory_impl); + + NiceMock factory_context; + TestConfigImpl(tap_config, NULL, factory_context); +} + // Make sure warn if using a pipe address for the admin handler. TEST_F(AdminHandlerTest, AdminWithPipeSocket) { EXPECT_LOG_CONTAINS(