Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 44 additions & 28 deletions source/extensions/filters/http/cache/cache_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ Http::FilterHeadersStatus CacheFilter::encodeHeaders(Http::ResponseHeaderMap& he

if (filter_state_ == FilterState::ValidatingCachedResponse && isResponseNotModified(headers)) {
processSuccessfulValidation(headers);
// Stop the encoding stream until the cached response is fetched & added to the encoding stream.
return Http::FilterHeadersStatus::StopIteration;
// Continue encoding the headers but do not end the stream as the response body is yet to be
// injected.
return Http::FilterHeadersStatus::ContinueAndDontEndStream;
Comment on lines +93 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this correctly handle the case where the cached response is a header only response?

}

// Either a cache miss or a cache entry that is no longer valid.
Expand All @@ -113,10 +114,6 @@ Http::FilterDataStatus CacheFilter::encodeData(Buffer::Instance& data, bool end_
// cached response was found and is being added to the encoding stream -- ignore it.
return Http::FilterDataStatus::Continue;
}
if (filter_state_ == FilterState::EncodeServingFromCache) {
// Stop the encoding stream until the cached response is fetched & added to the encoding stream.
return Http::FilterDataStatus::StopIterationAndBuffer;
}
Comment on lines -116 to -119
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed? Can this no longer happen? Should the enum be removed?

if (insert_) {
ENVOY_STREAM_LOG(debug, "CacheFilter::encodeData inserting body", *encoder_callbacks_);
// TODO(toddmgreer): Wait for the cache if necessary.
Expand All @@ -126,6 +123,17 @@ Http::FilterDataStatus CacheFilter::encodeData(Buffer::Instance& data, bool end_
return Http::FilterDataStatus::Continue;
}

void CacheFilter::onAboveWriteBufferHighWatermark() { ++high_watermark_calls_; }

void CacheFilter::onBelowWriteBufferLowWatermark() {
ASSERT(high_watermark_calls_ > 0);
--high_watermark_calls_;
if (!remaining_ranges_.empty()) {
// Fetching the cached response body was stopped, continue if possible.
maybeGetBody();
}
}
Comment on lines +128 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases in which we stream through a router response and this might get triggered? I don't see any guards in maybeGetBody() to confirm that we are actually doing a fetch?


void CacheFilter::getHeaders(Http::RequestHeaderMap& request_headers) {
ASSERT(lookup_, "CacheFilter is trying to call getHeaders with no LookupContext");

Expand Down Expand Up @@ -165,9 +173,21 @@ void CacheFilter::getHeaders(Http::RequestHeaderMap& request_headers) {
});
}

void CacheFilter::getBody() {
void CacheFilter::maybeGetBody() {
ASSERT(lookup_, "CacheFilter is trying to call getBody with no LookupContext");
ASSERT(!remaining_ranges_.empty(), "No reason to call getBody when there's no body to get.");

if (high_watermark_calls_ > 0 || ongoing_fetch_) {
return;
}
ongoing_fetch_ = true;

// Make sure we are not fetching a chunk of data larger than the encoding buffer limit.
uint64_t begin = remaining_ranges_[0].begin();
uint64_t end = std::min(remaining_ranges_[0].end(),
remaining_ranges_[0].begin() + encoder_callbacks_->encoderBufferLimit());
AdjustedByteRange range_to_fetch = {begin, end};

// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
Expand All @@ -177,8 +197,8 @@ void CacheFilter::getBody() {

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getBody(remaining_ranges_[0], [self, &dispatcher = decoder_callbacks_->dispatcher()](
Buffer::InstancePtr&& body) {
lookup_->getBody(range_to_fetch, [self, &dispatcher = decoder_callbacks_->dispatcher()](
Buffer::InstancePtr&& body) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda passed to dispatcher.post() needs to be copyable as it will be used to
// initialize a std::function. Therefore, it cannot capture anything non-copyable.
Expand Down Expand Up @@ -302,6 +322,8 @@ void CacheFilter::onBody(Buffer::InstancePtr&& body) {
"bogus callback.");
ASSERT(body, "Cache said it had a body, but isn't giving it to us.");

ongoing_fetch_ = false;

const uint64_t bytes_from_cache = body->length();
if (bytes_from_cache < remaining_ranges_[0].length()) {
remaining_ranges_[0].trimFront(bytes_from_cache);
Expand All @@ -318,14 +340,12 @@ void CacheFilter::onBody(Buffer::InstancePtr&& body) {

filter_state_ == FilterState::DecodeServingFromCache
? decoder_callbacks_->encodeData(*body, end_stream)
: encoder_callbacks_->addEncodedData(*body, true);
: encoder_callbacks_->injectEncodedDataToFilterChain(*body, end_stream);

if (!remaining_ranges_.empty()) {
getBody();
maybeGetBody();
} else if (response_has_trailers_) {
getTrailers();
} else {
finalizeEncodingCachedResponse();
}
}

Expand All @@ -339,10 +359,12 @@ void CacheFilter::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
if (filter_state_ == FilterState::DecodeServingFromCache) {
decoder_callbacks_->encodeTrailers(std::move(trailers));
} else {
Http::ResponseTrailerMap& response_trailers = encoder_callbacks_->addEncodedTrailers();
response_trailers = std::move(*trailers);
// The current API does not support this.
// TODO(yosrym93): When trailers support is implemented, a function in
// StreamEncoderFilterCallbacks will need to be implemented to inject trailers. See
// FilterHeadersStatus::ContinueAndDontEndStream docs in filter.h for more details.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
finalizeEncodingCachedResponse();
}

void CacheFilter::processSuccessfulValidation(Http::ResponseHeaderMap& response_headers) {
Expand Down Expand Up @@ -456,32 +478,26 @@ void CacheFilter::encodeCachedResponse() {
CacheResponseCodeDetails::get().ResponseFromCacheFilter);

// If the filter is encoding, 304 response headers and cached headers are merged in encodeHeaders.
// If the filter is decoding, we need to serve response headers from cache directly.
// If the filter is decoding, we need to serve cached response headers directly.
if (filter_state_ == FilterState::DecodeServingFromCache) {
decoder_callbacks_->encodeHeaders(std::move(lookup_result_->headers_), end_stream,
CacheResponseCodeDetails::get().ResponseFromCacheFilter);
}

// TODO(yosrym93): Make sure this is the right place to add the callbacks.
decoder_callbacks_->addDownstreamWatermarkCallbacks(*this);
Comment on lines +487 to +488
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend initializing this as early as possible (in setEncoderFilterCallbacks)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you really meant setEncoderFilterCallbacks rather than a typo here. Since I'm stepping up on the implementation code, I did get confused on why the registration is only provided available via StreamDecoderFilterCallbacks. What if a pure encoder filter (who doesn't have decoder_callbacks) wants to listen to downstream watermark changes? In other words, there's no way you could call: encoder_callbacks_->addDownstreamWatermarkCallbacks(*this).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just an implementation oversight because no one has needed it before.


if (lookup_result_->content_length_ > 0) {
// No range has been added, so we add entire body to the response.
if (remaining_ranges_.empty()) {
// This is not a range request, encode the entire body.
remaining_ranges_.emplace_back(0, lookup_result_->content_length_);
}
getBody();
maybeGetBody();
} else if (response_has_trailers_) {
getTrailers();
}
}

void CacheFilter::finalizeEncodingCachedResponse() {
if (filter_state_ == FilterState::EncodeServingFromCache) {
// encodeHeaders returned StopIteration waiting for finishing encoding the cached response --
// continue encoding.
encoder_callbacks_->continueEncoding();
}
filter_state_ = FilterState::ResponseServedFromCache;
}

} // namespace Cache
} // namespace HttpFilters
} // namespace Extensions
Expand Down
26 changes: 14 additions & 12 deletions source/extensions/filters/http/cache/cache_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,34 @@ namespace Cache {
* A filter that caches responses and attempts to satisfy requests from cache.
*/
class CacheFilter : public Http::PassThroughFilter,
public Http::DownstreamWatermarkCallbacks,
public Logger::Loggable<Logger::Id::cache_filter>,
public std::enable_shared_from_this<CacheFilter> {
public:
CacheFilter(const envoy::extensions::filters::http::cache::v3alpha::CacheConfig& config,
const std::string& stats_prefix, Stats::Scope& scope, TimeSource& time_source,
HttpCache& http_cache);

// Http::StreamFilterBase
void onDestroy() override;

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;
Http::FilterDataStatus encodeData(Buffer::Instance& buffer, bool end_stream) override;

// Http::DownstreamWatermarkCallbacks
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

private:
// Utility functions; make any necessary checks and call the corresponding lookup_ functions
void getHeaders(Http::RequestHeaderMap& request_headers);
void getBody();
void maybeGetBody();
void getTrailers();

// Callbacks for HttpCache to call when headers/body/trailers are ready.
Expand Down Expand Up @@ -71,19 +79,13 @@ class CacheFilter : public Http::PassThroughFilter,
// or during encoding if a cache entry was validated successfully.
void encodeCachedResponse();

// Precondition: finished adding a response from cache to the response encoding stream.
// Updates filter_state_ and continues the encoding stream if necessary.
void finalizeEncodingCachedResponse();

TimeSource& time_source_;
HttpCache& cache_;
LookupContextPtr lookup_;
InsertContextPtr insert_;
LookupResultPtr lookup_result_;

// Tracks what body bytes still need to be read from the cache. This is
// currently only one Range, but will expand when full range support is added. Initialized by
// onHeaders for Range Responses, otherwise initialized by encodeCachedResponse.
// Tracks what body bytes still need to be read from the cache.
std::vector<AdjustedByteRange> remaining_ranges_;

// TODO(#12901): The allow list could be constructed only once directly from the config, instead
Expand All @@ -100,6 +102,10 @@ class CacheFilter : public Http::PassThroughFilter,
// https://httpwg.org/specs/rfc7234.html#response.cacheability
bool request_allows_inserts_ = false;

// These are used to keep track of whether we should fetch more data from the cache.
int high_watermark_calls_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unsigned

bool ongoing_fetch_ = false;

enum class FilterState {
Initial,

Expand All @@ -112,10 +118,6 @@ class CacheFilter : public Http::PassThroughFilter,
// A cached response was successfully validated and it is being added to the encoding stream
EncodeServingFromCache,

// The cached response was successfully added to the encoding stream (either during decoding or
// encoding).
ResponseServedFromCache,

// CacheFilter::onDestroy has been called, the filter will be destroyed soon. Any triggered
// callbacks should be ignored.
Destroyed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class CacheIntegrationTest : public Event::TestUsingSimulatedTime,

void initializeFilter(const std::string& config) {
config_helper_.addFilter(config);
config_helper_.setBufferLimits(buffer_limit_, buffer_limit_);
initialize();
codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http"))));
}
Expand All @@ -34,6 +35,7 @@ class CacheIntegrationTest : public Event::TestUsingSimulatedTime,
"@type": "type.googleapis.com/envoy.source.extensions.filters.http.cache.SimpleHttpCacheConfig"
)EOF"};
DateFormatter formatter_{"%a, %d %b %Y %H:%M:%S GMT"};
const uint64_t buffer_limit_ = 1024;
};

INSTANTIATE_TEST_SUITE_P(Protocols, CacheIntegrationTest,
Expand All @@ -53,7 +55,10 @@ TEST_P(CacheIntegrationTest, MissInsertHit) {
{":scheme", "http"},
{":authority", "MissInsertHit"}};

const std::string response_body(42, 'a');
// Use a body arbitrarily greater than the buffer limit to exercise fetching the cached body in
// several chunks.
const std::string response_body(buffer_limit_ * 3.5, 'a');

Http::TestResponseHeaderMapImpl response_headers = {
{":status", "200"},
{"date", formatter_.now(simTime())},
Expand Down Expand Up @@ -109,7 +114,10 @@ TEST_P(CacheIntegrationTest, ExpiredValidated) {
{":scheme", "http"},
{":authority", "ExpiredValidated"}};

const std::string response_body(42, 'a');
// Use a body arbitrarily greater than the buffer limit to exercise fetching the cached body in
// several chunks.
const std::string response_body(buffer_limit_ * 3.5, 'a');

Http::TestResponseHeaderMapImpl response_headers = {
{":status", "200"},
{"date", formatter_.now(simTime())},
Expand Down
Loading