Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ When decompression is *applied*:

- The *content-encoding* header is modified to remove the decompression that was applied.

- *x-envoy-decompressor-<decompressor_name>-<compressed/uncompressed>-bytes* trailers are added to
the request/response to relay information about decompression.

.. _decompressor-statistics:

Using different decompressors for requests and responses
Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ New Features
* access log: added support for nested objects in :ref:`JSON logging mode <config_access_log_format_dictionaries>`.
* build: enable building envoy :ref:`arm64 images <arm_binaries>` by buildx tool in x86 CI platform.
* cluster: added new :ref:`connection_pool_per_downstream_connection <envoy_v3_api_field_config.cluster.v3.Cluster.connection_pool_per_downstream_connection>` flag, which enable creation of a new connection pool for each downstream connection.
* decompressor filter: reports compressed and uncompressed bytes in trailers.
* dns_filter: added support for answering :ref:`service record<envoy_v3_api_msg_data.dns.v3.DnsTable.DnsService>` queries.
* dynamic_forward_proxy: added :ref:`use_tcp_for_dns_lookups<envoy_v3_api_field_extensions.common.dynamic_forward_proxy.v3.DnsCacheConfig.use_tcp_for_dns_lookups>` option to use TCP for DNS lookups in order to match the DNS options for :ref:`Clusters<envoy_v3_api_msg_config.cluster.v3.Cluster>`.
* ext_authz filter: added support for emitting dynamic metadata for both :ref:`HTTP <config_http_filters_ext_authz_dynamic_metadata>` and :ref:`network <config_network_filters_ext_authz_dynamic_metadata>` filters.
Expand Down
84 changes: 62 additions & 22 deletions source/extensions/filters/http/decompressor/decompressor_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ DecompressorFilterConfig::DecompressorFilterConfig(
: stats_prefix_(fmt::format("{}decompressor.{}.{}", stats_prefix,
proto_config.decompressor_library().name(),
decompressor_factory->statsPrefix())),
trailers_prefix_(fmt::format("{}-decompressor-{}",
ThreadSafeSingleton<Http::PrefixValue>::get().prefix(),
proto_config.decompressor_library().name())),
decompressor_stats_prefix_(stats_prefix_ + "decompressor_library"),
decompressor_factory_(std::move(decompressor_factory)),
request_direction_config_(proto_config.request_direction_config(), stats_prefix_, scope,
Expand Down Expand Up @@ -56,7 +59,10 @@ DecompressorFilterConfig::ResponseDirectionConfig::ResponseDirectionConfig(
: DirectionConfig(proto_config.common_config(), stats_prefix + "response.", scope, runtime) {}

DecompressorFilter::DecompressorFilter(DecompressorFilterConfigSharedPtr config)
: config_(std::move(config)) {}
: config_(std::move(config)), request_byte_tracker_(config_->trailersCompressedBytesString(),
config_->trailersUncompressedBytesString()),
response_byte_tracker_(config_->trailersCompressedBytesString(),
config_->trailersUncompressedBytesString()) {}

Http::FilterHeadersStatus DecompressorFilter::decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) {
Expand All @@ -82,9 +88,24 @@ Http::FilterHeadersStatus DecompressorFilter::decodeHeaders(Http::RequestHeaderM
*decoder_callbacks_, headers);
};

Http::FilterDataStatus DecompressorFilter::decodeData(Buffer::Instance& data, bool) {
return maybeDecompress(config_->requestDirectionConfig(), request_decompressor_,
*decoder_callbacks_, data);
Http::FilterDataStatus DecompressorFilter::decodeData(Buffer::Instance& data, bool end_stream) {
if (request_decompressor_) {
HeaderMapOptRef trailers;
if (end_stream) {
trailers = HeaderMapOptRef(std::ref(decoder_callbacks_->addDecodedTrailers()));
}
decompress(config_->requestDirectionConfig(), request_decompressor_, *decoder_callbacks_, data,
request_byte_tracker_, trailers);
}
return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus DecompressorFilter::decodeTrailers(Http::RequestTrailerMap& trailers) {
// Only report if the filter has actually decompressed.
if (request_decompressor_) {
request_byte_tracker_.reportTotalBytes(trailers);
}
return Http::FilterTrailersStatus::Continue;
}

Http::FilterHeadersStatus DecompressorFilter::encodeHeaders(Http::ResponseHeaderMap& headers,
Expand All @@ -99,29 +120,48 @@ Http::FilterHeadersStatus DecompressorFilter::encodeHeaders(Http::ResponseHeader
*encoder_callbacks_, headers);
}

Http::FilterDataStatus DecompressorFilter::encodeData(Buffer::Instance& data, bool) {
return maybeDecompress(config_->responseDirectionConfig(), response_decompressor_,
*encoder_callbacks_, data);
Http::FilterDataStatus DecompressorFilter::encodeData(Buffer::Instance& data, bool end_stream) {
if (response_decompressor_) {
HeaderMapOptRef trailers;
if (end_stream) {
trailers = HeaderMapOptRef(std::ref(encoder_callbacks_->addEncodedTrailers()));
}
decompress(config_->responseDirectionConfig(), response_decompressor_, *encoder_callbacks_,
data, response_byte_tracker_, trailers);
}
return Http::FilterDataStatus::Continue;
}

Http::FilterDataStatus DecompressorFilter::maybeDecompress(
Http::FilterTrailersStatus DecompressorFilter::encodeTrailers(Http::ResponseTrailerMap& trailers) {
// Only report if the filter has actually decompressed.
if (response_decompressor_) {
response_byte_tracker_.reportTotalBytes(trailers);
}
return Http::FilterTrailersStatus::Continue;
}

void DecompressorFilter::decompress(
const DecompressorFilterConfig::DirectionConfig& direction_config,
const Compression::Decompressor::DecompressorPtr& decompressor,
Http::StreamFilterCallbacks& callbacks, Buffer::Instance& input_buffer) const {
if (decompressor) {
Buffer::OwnedImpl output_buffer;
decompressor->decompress(input_buffer, output_buffer);

// Report decompression via stats and logging before modifying the input buffer.
direction_config.stats().total_compressed_bytes_.add(input_buffer.length());
direction_config.stats().total_uncompressed_bytes_.add(output_buffer.length());
ENVOY_STREAM_LOG(debug, "{} data decompressed from {} bytes to {} bytes", callbacks,
direction_config.logString(), input_buffer.length(), output_buffer.length());

input_buffer.drain(input_buffer.length());
input_buffer.add(output_buffer);
Http::StreamFilterCallbacks& callbacks, Buffer::Instance& input_buffer,
ByteTracker& byte_tracker, HeaderMapOptRef trailers) const {
ASSERT(decompressor);
Buffer::OwnedImpl output_buffer;
decompressor->decompress(input_buffer, output_buffer);

// Report decompression via stats and logging before modifying the input buffer.
byte_tracker.chargeBytes(input_buffer.length(), output_buffer.length());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking desired behavior: Do we want to only increment byte counts if we actually do decompression (versus regardless of if we do decompression)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean if decompress had an issue? Or not doing decompression at all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter (i.e., if the response is not compressed)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is that the trailers should only be added if the stream was actually decompressed. So this is correct. But you did make me realize that the decode/encodeTrailers calls were not gated and that was wrong. Fixed in: 540a5ae

direction_config.stats().total_compressed_bytes_.add(input_buffer.length());
direction_config.stats().total_uncompressed_bytes_.add(output_buffer.length());
ENVOY_STREAM_LOG(debug, "{} data decompressed from {} bytes to {} bytes", callbacks,
direction_config.logString(), input_buffer.length(), output_buffer.length());

input_buffer.drain(input_buffer.length());
input_buffer.add(output_buffer);

if (trailers.has_value()) {
byte_tracker.reportTotalBytes(trailers.value().get());
}
return Http::FilterDataStatus::Continue;
}

template <>
Expand Down
42 changes: 39 additions & 3 deletions source/extensions/filters/http/decompressor/decompressor_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,18 @@ class DecompressorFilterConfig {
const std::string& contentEncoding() { return decompressor_factory_->contentEncoding(); }
const RequestDirectionConfig& requestDirectionConfig() { return request_direction_config_; }
const ResponseDirectionConfig& responseDirectionConfig() { return response_direction_config_; }
const Http::LowerCaseString& trailersCompressedBytesString() const {
CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, Http::LowerCaseString(fmt::format(
"{}-compressed-bytes", trailers_prefix_)));
}
const Http::LowerCaseString& trailersUncompressedBytesString() const {
CONSTRUCT_ON_FIRST_USE(Http::LowerCaseString, Http::LowerCaseString(fmt::format(
"{}-uncompressed-bytes", trailers_prefix_)));
}

private:
const std::string stats_prefix_;
const std::string trailers_prefix_;
const std::string decompressor_stats_prefix_;
const Compression::Decompressor::DecompressorFactoryPtr decompressor_factory_;
const RequestDirectionConfig request_direction_config_;
Expand All @@ -122,12 +131,36 @@ class DecompressorFilter : public Http::PassThroughFilter,
// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override;
Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override;
Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap&) override;

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap&, bool) override;
Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override;
Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override;

private:
struct ByteTracker {
ByteTracker(const Http::LowerCaseString& compressed_bytes_trailer,
const Http::LowerCaseString& uncompressed_bytes_trailer)
: compressed_bytes_trailer_(compressed_bytes_trailer),
uncompressed_bytes_trailer_(uncompressed_bytes_trailer) {}
void chargeBytes(uint64_t compressed_bytes, uint64_t uncompressed_bytes) {
total_compressed_bytes_ += compressed_bytes;
total_uncompressed_bytes_ += uncompressed_bytes;
}
void reportTotalBytes(Http::HeaderMap& trailers) const {
trailers.addReferenceKey(compressed_bytes_trailer_, total_compressed_bytes_);
trailers.addReferenceKey(uncompressed_bytes_trailer_, total_uncompressed_bytes_);
}

private:
const Http::LowerCaseString& compressed_bytes_trailer_;
const Http::LowerCaseString& uncompressed_bytes_trailer_;
uint64_t total_compressed_bytes_{};
uint64_t total_uncompressed_bytes_{};
};
using ByteTrackerOptConstRef = absl::optional<std::reference_wrapper<const ByteTracker>>;

template <class HeaderType>
Http::FilterHeadersStatus
maybeInitDecompress(const DecompressorFilterConfig::DirectionConfig& direction_config,
Expand All @@ -153,10 +186,11 @@ class DecompressorFilter : public Http::PassThroughFilter,
return Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus
maybeDecompress(const DecompressorFilterConfig::DirectionConfig& direction_config,
using HeaderMapOptRef = absl::optional<std::reference_wrapper<Http::HeaderMap>>;
void decompress(const DecompressorFilterConfig::DirectionConfig& direction_config,
const Compression::Decompressor::DecompressorPtr& decompressor,
Http::StreamFilterCallbacks& callbacks, Buffer::Instance& input_buffer) const;
Http::StreamFilterCallbacks& callbacks, Buffer::Instance& input_buffer,
ByteTracker& byte_tracker, HeaderMapOptRef trailers) const;

// TODO(junr03): These can be shared between compressor and decompressor.
template <Http::CustomInlineHeaderRegistry::Type Type>
Expand Down Expand Up @@ -201,6 +235,8 @@ class DecompressorFilter : public Http::PassThroughFilter,
DecompressorFilterConfigSharedPtr config_;
Compression::Decompressor::DecompressorPtr request_decompressor_{};
Compression::Decompressor::DecompressorPtr response_decompressor_{};
ByteTracker request_byte_tracker_;
ByteTracker response_byte_tracker_;
};

} // namespace Decompressor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DecompressorIntegrationTest : public testing::TestWithParam<Network::Addre
void TearDown() override { cleanupUpstreamAndDownstream(); }

void initializeFilter(const std::string& config) {
setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);
config_helper_.addFilter(config);
HttpIntegrationTest::initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));
Expand Down Expand Up @@ -93,13 +94,22 @@ TEST_P(DecompressorIntegrationTest, BidirectionalDecompression) {
// Assert that the total bytes received upstream equal the sum of the uncompressed byte buffers
// sent.
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ("chunked", upstream_request_->headers().TransferEncoding()->value().getStringView());
EXPECT_EQ("gzip", upstream_request_->headers()
.get(Http::LowerCaseString("accept-encoding"))
->value()
.getStringView());
EXPECT_EQ(nullptr, upstream_request_->headers().get(Http::LowerCaseString("content-encoding")));
EXPECT_EQ(uncompressed_request_length, upstream_request_->bodyLength());
EXPECT_EQ(std::to_string(compressed_request_length),
upstream_request_->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-compressed-bytes"))
->value()
.getStringView());
EXPECT_EQ(std::to_string(uncompressed_request_length),
upstream_request_->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-uncompressed-bytes"))
->value()
.getStringView());

// Verify stats
test_server_->waitForCounterEq("http.config_test.decompressor.testlib.gzip.request.decompressed",
Expand Down Expand Up @@ -141,6 +151,16 @@ TEST_P(DecompressorIntegrationTest, BidirectionalDecompression) {
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
EXPECT_EQ(uncompressed_response_length, response->body().length());
EXPECT_EQ(std::to_string(compressed_response_length),
response->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-compressed-bytes"))
->value()
.getStringView());
EXPECT_EQ(std::to_string(uncompressed_response_length),
response->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-uncompressed-bytes"))
->value()
.getStringView());

// Verify stats
test_server_->waitForCounterEq("http.config_test.decompressor.testlib.gzip.response.decompressed",
Expand Down Expand Up @@ -203,12 +223,16 @@ TEST_P(DecompressorIntegrationTest, BidirectionalDecompressionError) {
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ("chunked", upstream_request_->headers().TransferEncoding()->value().getStringView());
EXPECT_EQ("gzip", upstream_request_->headers()
.get(Http::LowerCaseString("accept-encoding"))
->value()
.getStringView());
EXPECT_EQ(nullptr, upstream_request_->headers().get(Http::LowerCaseString("content-encoding")));
EXPECT_EQ(std::to_string(compressed_request_length),
upstream_request_->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-compressed-bytes"))
->value()
.getStringView());

// Verify stats. While the stream was decompressed, there should be a decompression failure.
test_server_->waitForCounterEq("http.config_test.decompressor.testlib.gzip.request.decompressed",
Expand Down Expand Up @@ -244,6 +268,11 @@ TEST_P(DecompressorIntegrationTest, BidirectionalDecompressionError) {

EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
EXPECT_EQ(std::to_string(compressed_response_length),
response->trailers()
->get(Http::LowerCaseString("x-envoy-decompressor-testlib-compressed-bytes"))
->value()
.getStringView());

// Verify stats. While the stream was decompressed, there should be a decompression failure.
test_server_->waitForCounterEq("http.config_test.decompressor.testlib.gzip.response.decompressed",
Expand Down
Loading