Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// The filter will send the "request_headers" and "response_headers" messages by default.
// In addition, if the "processing mode" is set , the "request_body" and "response_body"
// messages will be sent if the corresponding fields of the "processing_mode" are
// set to BUFFERED, and trailers will be sent if the corresponding fields are set
// to SEND. The other body processing modes are not
// set to BUFFERED or STREAMED, and trailers will be sent if the corresponding fields are set
// to SEND. The BUFFERED_PARTIAL body processing mode is not
// implemented yet. The filter will also respond to "immediate_response" messages
// at any point in the stream.

// As designed, the filter supports up to six different processing steps, which are in the
// process of being implemented:
// * Request headers: IMPLEMENTED
// * Request body: Only BUFFERED mode is implemented
// * Request body: BUFFERED_PARTIAL processing mode is not yet implemented
// * Request trailers: IMPLEMENTED
// * Response headers: IMPLEMENTED
// * Response body: Only BUFFERED mode is implemented
// * Response body: BUFFERED_PARTIAL processing mode is not yet implemented
// * Response trailers: IMPLEMENTED

// The filter communicates with an external gRPC service that can use it to do a variety of things
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 70 additions & 5 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
ENVOY_LOG(debug, "Sending headers message");
stream_->send(std::move(req), false);
stats_.stream_msgs_sent_.inc();
state.setPaused(true);
return FilterHeadersStatus::StopIteration;
}

Expand Down Expand Up @@ -135,14 +136,33 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
// We don't know what to do with the body until the response comes back.
// We must buffer it in case we need it when that happens.
if (end_stream) {
state.setPaused(true);
return FilterDataStatus::StopIterationAndBuffer;
} else {
// Raise a watermark to prevent a buffer overflow until the response comes back.
state.setPaused(true);
state.requestWatermark();
return FilterDataStatus::StopIterationAndWatermark;
}
}

if (state.callbackState() == ProcessorState::CallbackState::StreamedBodyCallbackFinishing) {
// We were previously streaming the body, but there are more chunks waiting
// to be processed, so we can't send the body yet.
// Move the data for our chunk into a queue so that we can re-inject it later
// when the processor returns. Raise the watermark in an idempotent way
// if the queue is too large.
ENVOY_LOG(trace, "Enqueuing data while we wait for processing to finish");
state.enqueueStreamingChunk(data, end_stream, false);
if (end_stream) {
// But we need to buffer the last chunk because it's our last chance to do stuff
state.setPaused(true);
return FilterDataStatus::StopIterationNoBuffer;
} else {
return FilterDataStatus::Continue;
}
}

FilterDataStatus result;
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
Expand All @@ -160,22 +180,52 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
// The body has been buffered and we need to send the buffer
ENVOY_LOG(debug, "Sending request body message");
state.addBufferedData(data);
sendBodyChunk(state, *state.bufferedData(), true);
sendBodyChunk(state, *state.bufferedData(),
ProcessorState::CallbackState::BufferedBodyCallback, true);
// Since we just just moved the data into the buffer, return NoBuffer
// so that we do not buffer this chunk twice.
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
break;
}

ENVOY_LOG(trace, "onData: Buffering");
state.setPaused(true);
result = FilterDataStatus::StopIterationAndBuffer;
break;

case ProcessingMode::STREAMED: {
switch (openStream()) {
case StreamOpenState::Error:
return FilterDataStatus::StopIterationNoBuffer;
case StreamOpenState::IgnoreError:
return FilterDataStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}

// Send the chunk on the gRPC stream
sendBodyChunk(state, data, ProcessorState::CallbackState::StreamedBodyCallback, end_stream);
// Move the data to the queue and optionally raise the watermark.
state.enqueueStreamingChunk(data, end_stream, true);

// At this point we will continue, but with no data, because that will come later
Comment thread
gbrail marked this conversation as resolved.
if (end_stream) {
// But we need to buffer the last chunk because it's our last chance to do stuff
state.setPaused(true);
result = FilterDataStatus::StopIterationNoBuffer;
} else {
result = FilterDataStatus::Continue;
}
break;
}

case ProcessingMode::BUFFERED_PARTIAL:
case ProcessingMode::STREAMED:
ENVOY_LOG(debug, "Ignoring unimplemented request body processing mode");
result = FilterDataStatus::Continue;
break;

case ProcessingMode::NONE:
default:
result = FilterDataStatus::Continue;
Expand All @@ -195,6 +245,7 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b
}

sendTrailers(state, *new_trailers);
state.setPaused(true);
return FilterDataStatus::StopIterationAndBuffer;
}
return result;
Expand All @@ -221,14 +272,16 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback ||
state.callbackState() == ProcessorState::CallbackState::BufferedBodyCallback) {
ENVOY_LOG(trace, "Previous callback still executing -- holding header iteration");
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

if (!body_delivered && state.bodyMode() == ProcessingMode::BUFFERED) {
// We would like to process the body in a buffered way, but until now the complete
// body has not arrived. With the arrival of trailers, we now know that the body
// has arrived.
sendBufferedData(state, true);
sendBufferedData(state, ProcessorState::CallbackState::BufferedBodyCallback, true);
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

Expand All @@ -248,6 +301,7 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
}

sendTrailers(state, trailers);
state.setPaused(true);
return FilterTrailersStatus::StopIteration;
}

Expand Down Expand Up @@ -288,9 +342,10 @@ FilterTrailersStatus Filter::encodeTrailers(ResponseTrailerMap& trailers) {
return status;
}

void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, bool end_stream) {
void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream) {
ENVOY_LOG(debug, "Sending a body chunk of {} bytes", data.length());
state.setCallbackState(ProcessorState::CallbackState::BufferedBodyCallback);
state.setCallbackState(new_state);
state.startMessageTimer(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout());
ProcessingRequest req;
auto* body_req = state.mutableBody(req);
Expand All @@ -313,6 +368,7 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
if (processing_complete_) {
ENVOY_LOG(debug, "Ignoring stream message received after processing complete");
// Ignore additional messages after we decided we were done with the stream
return;
}
Expand All @@ -331,24 +387,31 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {

switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
ENVOY_LOG(debug, "Received RequestHeaders response");
message_handled = decoding_state_.handleHeadersResponse(response->request_headers());
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
ENVOY_LOG(debug, "Received ResponseHeaders response");
message_handled = encoding_state_.handleHeadersResponse(response->response_headers());
break;
case ProcessingResponse::ResponseCase::kRequestBody:
ENVOY_LOG(debug, "Received RequestBody response");
message_handled = decoding_state_.handleBodyResponse(response->request_body());
break;
case ProcessingResponse::ResponseCase::kResponseBody:
ENVOY_LOG(debug, "Received ResponseBody response");
message_handled = encoding_state_.handleBodyResponse(response->response_body());
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
ENVOY_LOG(debug, "Received RequestTrailers response");
Comment thread
gbrail marked this conversation as resolved.
Outdated
message_handled = decoding_state_.handleTrailersResponse(response->request_trailers());
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
ENVOY_LOG(debug, "Received responseTrailers response");
message_handled = encoding_state_.handleTrailersResponse(response->response_trailers());
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
ENVOY_LOG(debug, "Received ImmediateResponse response");
// We won't be sending anything more to the stream after we
// receive this message.
processing_complete_ = true;
Expand All @@ -357,6 +420,8 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
break;
default:
// Any other message is considered spurious
ENVOY_LOG(debug, "Received unknown stream message {} -- ignoring and marking spurious",
response->response_case());
break;
}

Expand Down
8 changes: 5 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,

void onMessageTimeout();

void sendBufferedData(ProcessorState& state, bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), end_stream);
void sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
}
void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream);

void sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers);

Expand All @@ -130,7 +133,6 @@ class Filter : public Logger::Loggable<Logger::Id::filter>,
void cleanUpTimers();
void clearAsyncState();
void sendImmediateResponse(const envoy::service::ext_proc::v3alpha::ImmediateResponse& response);
void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, bool end_stream);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
Expand Down
Loading