diff --git a/source/extensions/common/tap/admin.cc b/source/extensions/common/tap/admin.cc index f26334a0246ab..56face12bc0b6 100644 --- a/source/extensions/common/tap/admin.cc +++ b/source/extensions/common/tap/admin.cc @@ -100,11 +100,13 @@ void AdminHandler::unregisterConfig(ExtensionConfig& config) { } void AdminHandler::AdminPerTapSinkHandle::submitTrace( - const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) { + TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) { ENVOY_LOG(debug, "admin submitting buffered trace to main thread"); + // Convert to a shared_ptr, so we can send it to the main thread. + std::shared_ptr shared_trace{std::move(trace)}; // The handle can be destroyed before the cross thread post is complete. Thus, we capture a // reference to our parent. - parent_.main_thread_dispatcher_.post([& parent = parent_, trace, format]() { + parent_.main_thread_dispatcher_.post([& parent = parent_, trace = shared_trace, format]() { if (!parent.attached_request_.has_value()) { return; } diff --git a/source/extensions/common/tap/admin.h b/source/extensions/common/tap/admin.h index 81a8c9bb93e5d..be611ed0d4a6c 100644 --- a/source/extensions/common/tap/admin.h +++ b/source/extensions/common/tap/admin.h @@ -59,7 +59,7 @@ class AdminHandler : public Singleton::Instance, AdminPerTapSinkHandle(AdminHandler& parent) : parent_(parent) {} // Extensions::Common::Tap::PerTapSinkHandle - void submitTrace(const TraceWrapperSharedPtr& trace, + void submitTrace(TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) override; AdminHandler& parent_; diff --git a/source/extensions/common/tap/tap.h b/source/extensions/common/tap/tap.h index 6ef7e26957e99..30505f2d750d1 100644 --- a/source/extensions/common/tap/tap.h +++ b/source/extensions/common/tap/tap.h @@ -14,9 +14,9 @@ namespace Extensions { namespace Common { namespace Tap { -using TraceWrapperSharedPtr = std::shared_ptr; -inline TraceWrapperSharedPtr makeTraceWrapper() { - return std::make_shared(); +using TraceWrapperPtr = std::unique_ptr; +inline TraceWrapperPtr makeTraceWrapper() { + return std::make_unique(); } /** @@ -33,7 +33,7 @@ class PerTapSinkHandle { * @param trace supplies the trace to send. * @param format supplies the output format to use. */ - virtual void submitTrace(const TraceWrapperSharedPtr& trace, + virtual void submitTrace(TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) PURE; }; @@ -51,7 +51,7 @@ class PerTapSinkHandleManager { /** * Submit a buffered or streamed trace segment to all managed per-tap sink handles. */ - virtual void submitTrace(const TraceWrapperSharedPtr& trace) PURE; + virtual void submitTrace(TraceWrapperPtr&& trace) PURE; }; using PerTapSinkHandleManagerPtr = std::unique_ptr; diff --git a/source/extensions/common/tap/tap_config_base.cc b/source/extensions/common/tap/tap_config_base.cc index 0c139f008e9c0..85aa6bf1f69ce 100644 --- a/source/extensions/common/tap/tap_config_base.cc +++ b/source/extensions/common/tap/tap_config_base.cc @@ -139,14 +139,13 @@ void Utility::bodyBytesToString(envoy::data::tap::v2alpha::TraceWrapper& trace, } } -void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace( - const TraceWrapperSharedPtr& trace) { +void TapConfigBaseImpl::PerTapSinkHandleManagerImpl::submitTrace(TraceWrapperPtr&& trace) { Utility::bodyBytesToString(*trace, parent_.sink_format_); - handle_->submitTrace(trace, parent_.sink_format_); + handle_->submitTrace(std::move(trace), parent_.sink_format_); } void FilePerTapSink::FilePerTapSinkHandle::submitTrace( - const TraceWrapperSharedPtr& trace, envoy::service::tap::v2alpha::OutputSink::Format format) { + TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) { if (!output_file_.is_open()) { std::string path = fmt::format("{}_{}", parent_.config_.path_prefix(), trace_id_); switch (format) { diff --git a/source/extensions/common/tap/tap_config_base.h b/source/extensions/common/tap/tap_config_base.h index f2adc02c2a730..36b6f8fca5567 100644 --- a/source/extensions/common/tap/tap_config_base.h +++ b/source/extensions/common/tap/tap_config_base.h @@ -74,7 +74,7 @@ class TapConfigBaseImpl : public virtual TapConfig { : parent_(parent), handle_(parent.sink_to_use_->createPerTapSinkHandle(trace_id)) {} // PerTapSinkHandleManager - void submitTrace(const TraceWrapperSharedPtr& trace) override; + void submitTrace(TraceWrapperPtr&& trace) override; private: TapConfigBaseImpl& parent_; @@ -129,7 +129,7 @@ class FilePerTapSink : public Sink { : parent_(parent), trace_id_(trace_id) {} // PerTapSinkHandle - void submitTrace(const TraceWrapperSharedPtr& trace, + void submitTrace(TraceWrapperPtr&& trace, envoy::service::tap::v2alpha::OutputSink::Format format) override; FilePerTapSink& parent_; diff --git a/source/extensions/filters/http/tap/tap_config_impl.cc b/source/extensions/filters/http/tap/tap_config_impl.cc index 68106ab0c6eaf..295c76d319aec 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.cc +++ b/source/extensions/filters/http/tap/tap_config_impl.cc @@ -32,11 +32,11 @@ HttpPerRequestTapperPtr HttpTapConfigImpl::createPerRequestTapper(uint64_t strea } void HttpPerRequestTapperImpl::streamRequestHeaders() { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); request_headers_->iterate( fillHeaderList, trace->mutable_http_streamed_trace_segment()->mutable_request_headers()->mutable_headers()); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers) { @@ -51,54 +51,24 @@ void HttpPerRequestTapperImpl::onRequestHeaders(const Http::HeaderMap& headers) void HttpPerRequestTapperImpl::streamBufferedRequestBody() { if (buffered_streamed_request_body_ != nullptr) { - sink_handle_->submitTrace(buffered_streamed_request_body_); + sink_handle_->submitTrace(std::move(buffered_streamed_request_body_)); buffered_streamed_request_body_.reset(); } } void HttpPerRequestTapperImpl::onRequestBody(const Buffer::Instance& data) { - // TODO(mattklein123): Body matching. - if (config_->streaming()) { - const auto match_status = config_->rootMatcher().matchStatus(statuses_); - // Without body matching, we must have already started tracing or have not yet matched. - ASSERT(started_streaming_trace_ || !match_status.matches_); - - if (started_streaming_trace_) { - // If we have already started streaming, flush a body segment now. - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); - TapCommon::Utility::addBufferToProtoBytes( - *trace->mutable_http_streamed_trace_segment()->mutable_request_body_chunk(), - config_->maxBufferedRxBytes(), data, 0, data.length()); - sink_handle_->submitTrace(trace); - } else if (match_status.might_change_status_) { - // If we might still match, start buffering the request body up to our limit. - if (buffered_streamed_request_body_ == nullptr) { - buffered_streamed_request_body_ = makeTraceSegment(); - } - auto& body = *buffered_streamed_request_body_->mutable_http_streamed_trace_segment() - ->mutable_request_body_chunk(); - ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes()); - TapCommon::Utility::addBufferToProtoBytes( - body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length()); - } - } else { - // If we are not streaming, buffer the request body up to our limit. - makeBufferedFullTraceIfNeeded(); - auto& body = - *buffered_full_trace_->mutable_http_buffered_trace()->mutable_request()->mutable_body(); - ASSERT(body.as_bytes().size() <= config_->maxBufferedRxBytes()); - TapCommon::Utility::addBufferToProtoBytes( - body, config_->maxBufferedRxBytes() - body.as_bytes().size(), data, 0, data.length()); - } + onBody(data, buffered_streamed_request_body_, config_->maxBufferedRxBytes(), + &envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_request_body_chunk, + &envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_request); } void HttpPerRequestTapperImpl::streamRequestTrailers() { if (request_trailers_ != nullptr) { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); request_trailers_->iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment() ->mutable_request_trailers() ->mutable_headers()); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } } @@ -118,11 +88,11 @@ void HttpPerRequestTapperImpl::onRequestTrailers(const Http::HeaderMap& trailers } void HttpPerRequestTapperImpl::streamResponseHeaders() { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); response_headers_->iterate( fillHeaderList, trace->mutable_http_streamed_trace_segment()->mutable_response_headers()->mutable_headers()); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers) { @@ -143,45 +113,15 @@ void HttpPerRequestTapperImpl::onResponseHeaders(const Http::HeaderMap& headers) void HttpPerRequestTapperImpl::streamBufferedResponseBody() { if (buffered_streamed_response_body_ != nullptr) { - sink_handle_->submitTrace(buffered_streamed_response_body_); + sink_handle_->submitTrace(std::move(buffered_streamed_response_body_)); buffered_streamed_response_body_.reset(); } } void HttpPerRequestTapperImpl::onResponseBody(const Buffer::Instance& data) { - // TODO(mattklein123): Body matching. - if (config_->streaming()) { - const auto match_status = config_->rootMatcher().matchStatus(statuses_); - // Without body matching, we must have already started tracing or have not yet matched. - ASSERT(started_streaming_trace_ || !match_status.matches_); - - if (started_streaming_trace_) { - // If we have already started streaming, flush a body segment now. - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); - TapCommon::Utility::addBufferToProtoBytes( - *trace->mutable_http_streamed_trace_segment()->mutable_response_body_chunk(), - config_->maxBufferedTxBytes(), data, 0, data.length()); - sink_handle_->submitTrace(trace); - } else if (match_status.might_change_status_) { - // If we might still match, start buffering the request body up to our limit. - if (buffered_streamed_response_body_ == nullptr) { - buffered_streamed_response_body_ = makeTraceSegment(); - } - auto& body = *buffered_streamed_response_body_->mutable_http_streamed_trace_segment() - ->mutable_response_body_chunk(); - ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes()); - TapCommon::Utility::addBufferToProtoBytes( - body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length()); - } - } else { - // If we are not streaming, buffer the response body up to our limit. - makeBufferedFullTraceIfNeeded(); - auto& body = - *buffered_full_trace_->mutable_http_buffered_trace()->mutable_response()->mutable_body(); - ASSERT(body.as_bytes().size() <= config_->maxBufferedTxBytes()); - TapCommon::Utility::addBufferToProtoBytes( - body, config_->maxBufferedTxBytes() - body.as_bytes().size(), data, 0, data.length()); - } + onBody(data, buffered_streamed_response_body_, config_->maxBufferedTxBytes(), + &envoy::data::tap::v2alpha::HttpStreamedTraceSegment::mutable_response_body_chunk, + &envoy::data::tap::v2alpha::HttpBufferedTrace::mutable_response); } void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailers) { @@ -198,11 +138,11 @@ void HttpPerRequestTapperImpl::onResponseTrailers(const Http::HeaderMap& trailer streamBufferedResponseBody(); } - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); trailers.iterate(fillHeaderList, trace->mutable_http_streamed_trace_segment() ->mutable_response_trailers() ->mutable_headers()); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } } @@ -227,10 +167,50 @@ bool HttpPerRequestTapperImpl::onDestroyLog() { } ENVOY_LOG(debug, "submitting buffered trace sink"); - sink_handle_->submitTrace(buffered_full_trace_); + // move is safe as onDestroyLog is the last method called. + sink_handle_->submitTrace(std::move(buffered_full_trace_)); return true; } +void HttpPerRequestTapperImpl::onBody( + const Buffer::Instance& data, Extensions::Common::Tap::TraceWrapperPtr& buffered_streamed_body, + uint32_t maxBufferedBytes, MutableBodyChunk mutable_body_chunk, + MutableMessage mutable_message) { + // TODO(mattklein123): Body matching. + if (config_->streaming()) { + const auto match_status = config_->rootMatcher().matchStatus(statuses_); + // Without body matching, we must have already started tracing or have not yet matched. + ASSERT(started_streaming_trace_ || !match_status.matches_); + + if (started_streaming_trace_) { + // If we have already started streaming, flush a body segment now. + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); + TapCommon::Utility::addBufferToProtoBytes( + *(trace->mutable_http_streamed_trace_segment()->*mutable_body_chunk)(), maxBufferedBytes, + data, 0, data.length()); + sink_handle_->submitTrace(std::move(trace)); + } else if (match_status.might_change_status_) { + // If we might still match, start buffering the body up to our limit. + if (buffered_streamed_body == nullptr) { + buffered_streamed_body = makeTraceSegment(); + } + auto& body = + *(buffered_streamed_body->mutable_http_streamed_trace_segment()->*mutable_body_chunk)(); + ASSERT(body.as_bytes().size() <= maxBufferedBytes); + TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(), + data, 0, data.length()); + } + } else { + // If we are not streaming, buffer the body up to our limit. + makeBufferedFullTraceIfNeeded(); + auto& body = + *(buffered_full_trace_->mutable_http_buffered_trace()->*mutable_message)()->mutable_body(); + ASSERT(body.as_bytes().size() <= maxBufferedBytes); + TapCommon::Utility::addBufferToProtoBytes(body, maxBufferedBytes - body.as_bytes().size(), data, + 0, data.length()); + } +} + } // namespace TapFilter } // namespace HttpFilters } // namespace Extensions diff --git a/source/extensions/filters/http/tap/tap_config_impl.h b/source/extensions/filters/http/tap/tap_config_impl.h index 8977282836ba1..9846b6e529510 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.h +++ b/source/extensions/filters/http/tap/tap_config_impl.h @@ -42,15 +42,24 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::Loggablemutable_http_streamed_trace_segment()->set_trace_id(stream_id_); return segment; } @@ -71,9 +80,9 @@ class HttpPerRequestTapperImpl : public HttpPerRequestTapper, Logger::LoggablecreateMatchStatusVector()) { config_->rootMatcher().onNewStream(statuses_); if (config_->streaming() && config_->rootMatcher().matchStatus(statuses_).matches_) { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); fillConnectionInfo(*trace->mutable_socket_streamed_trace_segment()->mutable_connection()); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } } @@ -36,15 +36,15 @@ void PerSocketTapperImpl::closeSocket(Network::ConnectionEvent) { } if (config_->streaming()) { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); initEvent(event); event.mutable_closed(); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } else { makeBufferedTraceIfNeeded(); fillConnectionInfo(*buffered_trace_->mutable_socket_buffered_trace()->mutable_connection()); - sink_handle_->submitTrace(buffered_trace_); + sink_handle_->submitTrace(std::move(buffered_trace_)); } // Here we explicitly reset the sink_handle_ to release any sink resources and force a flush @@ -67,13 +67,13 @@ void PerSocketTapperImpl::onRead(const Buffer::Instance& data, uint32_t bytes_re } if (config_->streaming()) { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); initEvent(event); TapCommon::Utility::addBufferToProtoBytes(*event.mutable_read()->mutable_data(), config_->maxBufferedRxBytes(), data, data.length() - bytes_read, bytes_read); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } else { if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().read_truncated()) { return; @@ -99,14 +99,14 @@ void PerSocketTapperImpl::onWrite(const Buffer::Instance& data, uint32_t bytes_w } if (config_->streaming()) { - TapCommon::TraceWrapperSharedPtr trace = makeTraceSegment(); + TapCommon::TraceWrapperPtr trace = makeTraceSegment(); auto& event = *trace->mutable_socket_streamed_trace_segment()->mutable_event(); initEvent(event); TapCommon::Utility::addBufferToProtoBytes(*event.mutable_write()->mutable_data(), config_->maxBufferedTxBytes(), data, 0, bytes_written); event.mutable_write()->set_end_stream(end_stream); - sink_handle_->submitTrace(trace); + sink_handle_->submitTrace(std::move(trace)); } else { if (buffered_trace_ != nullptr && buffered_trace_->socket_buffered_trace().write_truncated()) { return; diff --git a/source/extensions/transport_sockets/tap/tap_config_impl.h b/source/extensions/transport_sockets/tap/tap_config_impl.h index 45af950cf189d..dc16dc2c96d28 100644 --- a/source/extensions/transport_sockets/tap/tap_config_impl.h +++ b/source/extensions/transport_sockets/tap/tap_config_impl.h @@ -28,9 +28,8 @@ class PerSocketTapperImpl : public PerSocketTapper { buffered_trace_->mutable_socket_buffered_trace()->set_trace_id(connection_.id()); } } - Extensions::Common::Tap::TraceWrapperSharedPtr makeTraceSegment() { - Extensions::Common::Tap::TraceWrapperSharedPtr trace = - Extensions::Common::Tap::makeTraceWrapper(); + Extensions::Common::Tap::TraceWrapperPtr makeTraceSegment() { + Extensions::Common::Tap::TraceWrapperPtr trace = Extensions::Common::Tap::makeTraceWrapper(); trace->mutable_socket_streamed_trace_segment()->set_trace_id(connection_.id()); return trace; } @@ -40,7 +39,7 @@ class PerSocketTapperImpl : public PerSocketTapper { const Network::Connection& connection_; Extensions::Common::Tap::Matcher::MatchStatusVector statuses_; // Must be a shared_ptr because of submitTrace(). - Extensions::Common::Tap::TraceWrapperSharedPtr buffered_trace_; + Extensions::Common::Tap::TraceWrapperPtr buffered_trace_; uint32_t rx_bytes_buffered_{}; uint32_t tx_bytes_buffered_{}; }; diff --git a/test/extensions/common/tap/common.h b/test/extensions/common/tap/common.h index f1c504735ea5b..17196c877f3e3 100644 --- a/test/extensions/common/tap/common.h +++ b/test/extensions/common/tap/common.h @@ -39,7 +39,7 @@ class MockPerTapSinkHandleManager : public PerTapSinkHandleManager { MockPerTapSinkHandleManager(); ~MockPerTapSinkHandleManager(); - void submitTrace(const TraceWrapperSharedPtr& trace) override { submitTrace_(*trace); } + void submitTrace(TraceWrapperPtr&& trace) override { submitTrace_(*trace); } MOCK_METHOD1(submitTrace_, void(const envoy::data::tap::v2alpha::TraceWrapper& trace)); };