Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/grpc/proxy_flow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,20 @@ namespace grpc {
// transition to "DownstreamFinish" in case of error.

namespace {

const char kGrpcEncoding[] = "grpc-encoding";
const char kGrpcAcceptEncoding[] = "grpc-accept-encoding";

Status ProcessDownstreamHeaders(
const std::multimap<std::string, std::string> &headers,
::grpc::ClientContext *context) {
static grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;

for (const auto &it : headers) {
if (it.first == kGrpcEncoding || it.first == kGrpcAcceptEncoding) {
// GRPC lib will add this header, so not adding it to client_context_
continue;
}
// GRPC runtime libraries use "-bin" suffix to detect binary headers and
// properly apply base64 encoding & decoding as headers are sent and
// received. So we decode here before passing it to GRPC runtime.
Expand Down
74 changes: 68 additions & 6 deletions src/nginx/grpc_server_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@

#include "src/nginx/grpc_server_call.h"

#include <grpc/impl/codegen/gpr_types.h>
#include <cassert>
#include <utility>

#include "contrib/endpoints/include/api_manager/utils/status.h"
#include "grpc++/support/byte_buffer.h"
#include "grpc/compression.h"
#include "src/nginx/error.h"
#include "src/nginx/grpc_finish.h"
#include "src/nginx/module.h"
#include "src/nginx/util.h"

extern "C" {
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/http/v2/ngx_http_v2_module.h"
}

Expand Down Expand Up @@ -108,6 +112,31 @@ ngx_int_t ngx_esp_write_output(ngx_http_request_t *r, ngx_chain_t *out,
return NGX_AGAIN;
}

u_char kGrpcEncoding[] = "grpc-encoding";

grpc_compression_algorithm GetCompressionAlgorithm(ngx_http_request_t *r) {
auto header =
ngx_esp_find_headers_in(r, kGrpcEncoding, sizeof(kGrpcEncoding) - 1);

if (header == nullptr) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"GetCompressionAlgorithm: algorithm not in header");

return grpc_compression_algorithm::GRPC_COMPRESS_NONE;
}

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"GetCompressionAlgorithm: algorithm=%V", &header->value);

grpc_compression_algorithm algorithm =
grpc_compression_algorithm::GRPC_COMPRESS_NONE;
grpc_compression_algorithm_parse(
reinterpret_cast<const char *>(header->value.data), header->value.len,
&algorithm);

return algorithm;
}

} // namespace

NgxEspGrpcServerCall::NgxEspGrpcServerCall(ngx_http_request_t *r,
Expand Down Expand Up @@ -488,6 +517,7 @@ void NgxEspGrpcServerCall::RunPendingRead() {
}

bool NgxEspGrpcServerCall::TryReadDownstreamMessage() {
static grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// From http://www.grpc.io/docs/guides/wire.html, a GRPC message is:
// * A one-byte compressed-flag
// * A four-byte message length
Expand All @@ -508,7 +538,7 @@ bool NgxEspGrpcServerCall::TryReadDownstreamMessage() {
return false;
}

// TODO: Implement compressed-flag
uint8_t compressed_flag = GetByte(downstream_slices_, 0);

// Decode the length. Note that this is in network byte order.
uint32_t msglen = 0;
Expand Down Expand Up @@ -561,11 +591,43 @@ bool NgxEspGrpcServerCall::TryReadDownstreamMessage() {
// the 'slices' vector (which will drop those reference counts as
// the vector is destroyed).
std::vector<::grpc::Slice> slices;
slices.reserve(it - downstream_slices_.begin());
std::transform(downstream_slices_.begin(), it, std::back_inserter(slices),
[](gpr_slice &slice) {
return ::grpc::Slice(slice, ::grpc::Slice::STEAL_REF);
});

if (compressed_flag == 1) {
gpr_slice_buffer input;
gpr_slice_buffer_init(&input);
gpr_slice_buffer_addn(&input, downstream_slices_.data(),
it - downstream_slices_.begin());

gpr_slice_buffer output;
gpr_slice_buffer_init(&output);

if (grpc_msg_decompress(&exec_ctx, GetCompressionAlgorithm(r_), &input,
&output) != 1) {
gpr_slice_buffer_destroy(&input);
gpr_slice_buffer_destroy(&output);
CompletePendingRead(false,
utils::Status(google::protobuf::util::error::INTERNAL,
"Failed to decompress GRPC message",
utils::Status::INTERNAL));

return true;
}

slices.reserve(output.count);
std::transform(output.slices, output.slices + output.count,
std::back_inserter(slices), [](gpr_slice &slice) {
return ::grpc::Slice(slice, ::grpc::Slice::ADD_REF);
});

gpr_slice_buffer_destroy(&input);
gpr_slice_buffer_destroy(&output);
} else {
slices.reserve(it - downstream_slices_.begin());
std::transform(downstream_slices_.begin(), it, std::back_inserter(slices),
[](gpr_slice &slice) {
return ::grpc::Slice(slice, ::grpc::Slice::STEAL_REF);
});
}

// Write the message byte buffer (giving the ByteBuffer its own
// reference counts).
Expand Down
1 change: 1 addition & 0 deletions src/nginx/t/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ nginx_suite(
"grpc_auth_pkey.t",
"grpc_call_flow_control.t",
"grpc_cloud_trace.t",
"grpc_compression.t",
"grpc_config_addr.t",
"grpc_downstream_flow_control.t",
"grpc_errors.t",
Expand Down
175 changes: 175 additions & 0 deletions src/nginx/t/grpc_compression.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright (C) Extensible Service Proxy Authors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
################################################################################
#
use strict;
use warnings;

################################################################################

use src::nginx::t::ApiManager; # Must be first (sets up import path to the Nginx test module)
use src::nginx::t::HttpServer;
use src::nginx::t::ServiceControl;
use Test::Nginx; # Imports Nginx's test module
use Test::More; # And the test framework

################################################################################

# Port assignments
my $ServiceControlPort = ApiManager::pick_port();
my $Http2NginxPort = ApiManager::pick_port();
my $GrpcBackendPort = ApiManager::pick_port();
my $GrpcFallbackPort = ApiManager::pick_port();

my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(9);
$t->write_file('service.pb.txt',
ApiManager::get_grpc_test_service_config($GrpcBackendPort) .
ApiManager::read_test_file('testdata/logs_metrics.pb.txt') . <<"EOF");
control {
environment: "http://127.0.0.1:${ServiceControlPort}"
}
EOF

ApiManager::write_file_expand($t, 'nginx.conf', <<"EOF");
%%TEST_GLOBALS%%
daemon off;
events {
worker_connections 32;
}
http {
%%TEST_GLOBALS_HTTP%%
server {
listen 127.0.0.1:${Http2NginxPort} http2;
server_name localhost;
location / {
endpoints {
api service.pb.txt;
%%TEST_CONFIG%%
on;
}
grpc_pass 127.0.0.2:${GrpcFallbackPort};
}
}
}
EOF

my $report_done = 'report_done';

$t->run_daemon(\&service_control, $t, $ServiceControlPort, 'servicecontrol.log', $report_done);
$t->run_daemon(\&ApiManager::grpc_test_server, $t, "127.0.0.1:${GrpcBackendPort}");
is($t->waitforsocket("127.0.0.1:${ServiceControlPort}"), 1, 'Service control socket ready.');
is($t->waitforsocket("127.0.0.1:${GrpcBackendPort}"), 1, 'GRPC test server socket ready.');
$t->run();
is($t->waitforsocket("127.0.0.1:${Http2NginxPort}"), 1, 'Nginx socket ready.');

################################################################################
my $test_results = &ApiManager::run_grpc_test($t, <<"EOF");
server_addr: "127.0.0.1:${Http2NginxPort}"
plans {
echo {
call_config {
api_key: "this-is-an-api-key"
compression: GZIP
}
request {
text: "This text must be long enough for GRPC library compress it. ______________________________________________________________"
}
}
}
EOF

is($t->waitforfile("$t->{_testdir}/${report_done}"), 1, 'Report body file ready.');
$t->stop_daemons();

my $test_results_expected = <<'EOF';
results {
echo {
text: "This text must be long enough for GRPC library compress it. ______________________________________________________________"
}
}
EOF
is($test_results, $test_results_expected, 'Client tests completed as expected.');

my @servicecontrol_requests = ApiManager::read_http_stream($t, 'servicecontrol.log');
is(scalar @servicecontrol_requests, 2, 'Service control was called twice');

# :check
my $r = shift @servicecontrol_requests;
like($r->{uri}, qr/:check$/, 'First call was a :check');

# :report
$r = shift @servicecontrol_requests;
like($r->{uri}, qr/:report$/, 'Second call was a :report');

my $report_body = ServiceControl::convert_proto($r->{body}, 'report_request', 'json');
my $expected_report_body = ServiceControl::gen_report_body({
'serviceName' => 'endpoints-grpc-test.cloudendpointsapis.com',
'api_method' => 'test.grpc.Test.Echo',
'url' => '/test.grpc.Test/Echo',
'protocol' => 'grpc',
'api_key' => 'this-is-an-api-key',
'api_name' => 'test.grpc.Test',
'producer_project_id' => 'endpoints-grpc-test',
'location' => 'us-central1',
'http_method' => 'POST',
'log_message' => 'Method: test.grpc.Test.Echo',
'response_code' => '200',
'request_size' => ($^O eq 'darwin' ? 371 : 373),
'request_bytes' => ($^O eq 'darwin' ? 371 : 373),
'streaming_request_message_counts' => 1,
'streaming_response_message_counts' => 1,
});
ok(ServiceControl::compare_http2_report_json($report_body, $expected_report_body), 'Report body is received.');

################################################################################

sub service_control {
my ($t, $port, $file, $done) = @_;
my $server = HttpServer->new($port, $t->testdir() . '/' . $file)
or die "Can't create test server socket: $!\n";

$server->on_sub('POST', '/v1/services/endpoints-grpc-test.cloudendpointsapis.com:check', sub {
my ($headers, $body, $client) = @_;
print $client <<'EOF';
HTTP/1.1 200 OK
Connection: close

EOF
});

$server->on_sub('POST', '/v1/services/endpoints-grpc-test.cloudendpointsapis.com:report', sub {
my ($headers, $body, $client) = @_;
print $client <<'EOF';
HTTP/1.1 200 OK
Connection: close

EOF
$t->write_file($done, ':report done');
});

$server->run();
}

################################################################################
4 changes: 4 additions & 0 deletions test/grpc/client-test-lib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ void SetCallConfig(const CallConfig &call_config, ClientContext *ctx) {
for (const auto &it : call_config.metadata()) {
ctx->AddMetadata(it.first, it.second);
}
if (call_config.compression()) {
ctx->set_compression_algorithm(
static_cast<grpc_compression_algorithm>(call_config.compression()));
}
}

template <class T1, class T2>
Expand Down
8 changes: 8 additions & 0 deletions test/grpc/grpc-test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ message CallConfig {

// Additional metadata to send to server
map<string, bytes> metadata = 4;

enum CompressionAlgorithm {
NONE = 0;
DEFLATE = 1;
GZIP = 2;
}
// Compression algorithm the request
CompressionAlgorithm compression = 5;
}

// The outcome of a GRPC call.
Expand Down