Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The filter will send the "request_headers" and "response_headers" messages by default.
// In addition, if the "processing mode" is set , the "request_body" and "response_body"
// messages will be sent if the corresponding fields of the "processing_mode" are
// set to BUFFERED, and trailers will be sent if the corresponding fields are set
// to SEND. The other body processing modes are not
// set to BUFFERED or STREAMED, and trailers will be sent if the corresponding fields are set
// to SEND. The BUFFERED_PARTIAL body processing mode is not
// implemented yet. The filter will also respond to "immediate_response" messages
// at any point in the stream.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
//
// * Request headers: IMPLEMENTED
// * Request body: Only BUFFERED mode is implemented
// * Request body: BUFFERED_PARTIAL processing mode is not yet implemented
// * Request trailers: IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: Only BUFFERED mode is implemented
// * Response body: BUFFERED_PARTIAL processing mode is not yet implemented
// * Response trailers: IMPLEMENTED

// The filter communicates with an external gRPC service that can use it to do a variety of things
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 96 additions & 12 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
ENVOY_LOG(debug, "Sending headers message");
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
state.setPaused(true);
return FilterHeadersStatus::StopIteration;
}

Expand All @@ -105,18 +106,15 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
if (end_stream) {
state.setCompleteBodyAvailable(true);
}

if (state.bodyReplaced()) {
ENVOY_LOG(trace, "Clearing body chunk because CONTINUE_AND_REPLACE was returned");
data.drain(data.length());
return FilterDataStatus::Continue;
}

if (processing_complete_) {
ENVOY_LOG(trace, "Continuing (processing complete)");
return FilterDataStatus::Continue;
}

bool just_added_trailers = false;
Http::HeaderMap* new_trailers = nullptr;
if (end_stream && state.sendTrailers()) {
Expand All @@ -129,19 +127,36 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
state.setTrailersAvailable(true);
just_added_trailers = true;
}

if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) {
ENVOY_LOG(trace, "Header processing still in progress -- holding body data");
// We don't know what to do with the body until the response comes back.
// We must buffer it in case we need it when that happens.
if (end_stream) {
state.setPaused(true);
return FilterDataStatus::StopIterationAndBuffer;
} else {
// Raise a watermark to prevent a buffer overflow until the response comes back.
state.setPaused(true);
state.requestWatermark();
return FilterDataStatus::StopIterationAndWatermark;
}
}
if (state.callbackState() == ProcessorState::CallbackState::StreamedBodyCallbackFinishing) {
// We were previously streaming the body, but there are more chunks waiting
// to be processed, so we can't send the body yet.
// Move the data for our chunk into a queue so that we can re-inject it later
// when the processor returns. See the comments below for more details on how
// this works in general.
ENVOY_LOG(trace, "Enqueuing data while we wait for processing to finish");
state.enqueueStreamingChunk(data, end_stream, false);
if (end_stream) {
// But we need to buffer the last chunk because it's our last chance to do stuff
state.setPaused(true);
return FilterDataStatus::StopIterationNoBuffer;
} else {
return FilterDataStatus::Continue;
}
}

FilterDataStatus result;
switch (state.bodyMode()) {
Expand All @@ -160,19 +175,60 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
// The body has been buffered and we need to send the buffer
ENVOY_LOG(debug, "Sending request body message");
state.addBufferedData(data);
sendBodyChunk(state, *state.bufferedData(), true);
sendBodyChunk(state, *state.bufferedData(),
ProcessorState::CallbackState::BufferedBodyCallback, true);
// Since we just just moved the data into the buffer, return NoBuffer
// so that we do not buffer this chunk twice.
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
break;
}

ENVOY_LOG(trace, "onData: Buffering");
state.setPaused(true);
result = FilterDataStatus::StopIterationAndBuffer;
break;
case ProcessingMode::STREAMED: {
// STREAMED body mode works as follows:
//
// 1) As data callbacks come in to the filter, it "moves" the data into a new buffer, which it
// dispatches via gRPC message to the external processor, and then keeps in a queue. It
// may request a watermark if the queue is higher than the buffer limit to prevent running
// out of memory.
// 2) As a result, filters farther down the chain see empty buffers in some data callbacks.
// 3) When a response comes back from the external processor, it injects the processor's result
// into the filter chain using "inject**codedData". (The processor may respond indicating that
// there is no change, which means that the original buffer stored in the queue is what gets
// injected.)
//
// This way, we pipeline data from the proxy to the external processor, and give the processor
// the ability to modify each chunk, in order. Doing this any other way would have required
// substantial changes to the filter manager. See
// https://github.com/envoyproxy/envoy/issues/16760 for a discussion.
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}
// Send the chunk on the gRPC stream
sendBodyChunk(state, data, ProcessorState::CallbackState::StreamedBodyCallback, end_stream);
// Move the data to the queue and optionally raise the watermark.
state.enqueueStreamingChunk(data, end_stream, true);

// At this point we will continue, but with no data, because that will come later
if (end_stream) {
// But we need to buffer the last chunk because it's our last chance to do stuff
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
} else {
result = FilterDataStatus::Continue;
}
break;
}
case ProcessingMode::BUFFERED_PARTIAL:
case ProcessingMode::STREAMED:
ENVOY_LOG(debug, "Ignoring unimplemented request body processing mode");
result = FilterDataStatus::Continue;
break;
Expand All @@ -181,7 +237,6 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
result = FilterDataStatus::Continue;
break;
}

if (just_added_trailers) {
// If we get here, then we need to send the trailers message now
switch (openStream()) {
Expand All @@ -193,8 +248,8 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
// Fall through
break;
}

sendTrailers(state, *new_trailers);
state.setPaused(true);
return FilterDataStatus::StopIterationAndBuffer;
}
return result;
Expand All @@ -221,14 +276,16 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback ||
state.callbackState() == ProcessorState::CallbackState::BufferedBodyCallback) {
ENVOY_LOG(trace, "Previous callback still executing -- holding header iteration");
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

if (!body_delivered && state.bodyMode() == ProcessingMode::BUFFERED) {
// We would like to process the body in a buffered way, but until now the complete
// body has not arrived. With the arrival of trailers, we now know that the body
// has arrived.
sendBufferedData(state, true);
sendBufferedData(state, ProcessorState::CallbackState::BufferedBodyCallback, true);
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

Expand All @@ -248,6 +305,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
}

sendTrailers(state, trailers);
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

Expand Down Expand Up @@ -288,9 +346,10 @@ FilterTrailersStatus Filter::encodeTrailers(ResponseTrailerMap& trailers) {
return status;
}

void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, bool end_stream) {
void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes", data.length());
state.setCallbackState(ProcessorState::CallbackState::BufferedBodyCallback);
state.setCallbackState(new_state);
state.startMessageTimer(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout());
ProcessingRequest req;
auto* body_req = state.mutableBody(req);
Expand All @@ -313,6 +372,7 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
if (processing_complete_) {
ENVOY_LOG(debug, "Ignoring stream message received after processing complete");
// Ignore additional messages after we decided we were done with the stream
return;
}
Expand All @@ -329,6 +389,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
encoding_state_.setProcessingMode(response->mode_override());
}

ENVOY_LOG(debug, "Received {} response", responseCaseToString(response->response_case()));
switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
message_handled = decoding_state_.handleHeadersResponse(response->request_headers());
Expand Down Expand Up @@ -357,6 +418,8 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
break;
default:
// Any other message is considered spurious
ENVOY_LOG(debug, "Received unknown stream message {} -- ignoring and marking spurious",
response->response_case());
break;
}

Expand Down Expand Up @@ -463,6 +526,27 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
mutate_headers, grpc_status, response.details());
}

std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) {
switch (response_case) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
return "request headers";
case ProcessingResponse::ResponseCase::kResponseHeaders:
return "response headers";
case ProcessingResponse::ResponseCase::kRequestBody:
return "request body";
case ProcessingResponse::ResponseCase::kResponseBody:
return "response body";
case ProcessingResponse::ResponseCase::kRequestTrailers:
return "request trailers";
case ProcessingResponse::ResponseCase::kResponseTrailers:
return "response trailers";
case ProcessingResponse::ResponseCase::kImmediateResponse:
return "immediate response";
default:
return "unknown";
}
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
11 changes: 8 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onMessageTimeout();

void sendBufferedData(ProcessorState& state, bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), end_stream);
void sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
}
void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream);

void sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers);

Expand All @@ -130,7 +133,6 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,
void cleanUpTimers();
void clearAsyncState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);
void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, bool end_stream);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
Expand Down Expand Up @@ -159,6 +161,9 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,
bool sent_immediate_response_ = false;
};

extern std::string responseCaseToString(
const envoy::service::ext_proc::v3alpha::ProcessingResponse::ResponseCase response_case);

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Loading