diff --git a/src/grpc/proxy_flow.cc b/src/grpc/proxy_flow.cc index 797dee355..ef7f8ac22 100644 --- a/src/grpc/proxy_flow.cc +++ b/src/grpc/proxy_flow.cc @@ -128,12 +128,20 @@ namespace grpc { // transition to "DownstreamFinish" in case of error. namespace { + +const char kGrpcEncoding[] = "grpc-encoding"; +const char kGrpcAcceptEncoding[] = "grpc-accept-encoding"; + Status ProcessDownstreamHeaders( const std::multimap &headers, ::grpc::ClientContext *context) { static grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; for (const auto &it : headers) { + if (it.first == kGrpcEncoding || it.first == kGrpcAcceptEncoding) { + // GRPC lib will add this header, so not adding it to client_context_ + continue; + } // GRPC runtime libraries use "-bin" suffix to detect binary headers and // properly apply base64 encoding & decoding as headers are sent and // received. So we decode here before passing it to GRPC runtime. diff --git a/src/nginx/grpc_server_call.cc b/src/nginx/grpc_server_call.cc index 8ad492370..0b21958a8 100644 --- a/src/nginx/grpc_server_call.cc +++ b/src/nginx/grpc_server_call.cc @@ -26,17 +26,21 @@ #include "src/nginx/grpc_server_call.h" +#include #include #include #include "contrib/endpoints/include/api_manager/utils/status.h" #include "grpc++/support/byte_buffer.h" +#include "grpc/compression.h" #include "src/nginx/error.h" #include "src/nginx/grpc_finish.h" #include "src/nginx/module.h" #include "src/nginx/util.h" extern "C" { +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/http/v2/ngx_http_v2_module.h" } @@ -108,6 +112,31 @@ ngx_int_t ngx_esp_write_output(ngx_http_request_t *r, ngx_chain_t *out, return NGX_AGAIN; } +u_char kGrpcEncoding[] = "grpc-encoding"; + +grpc_compression_algorithm GetCompressionAlgorithm(ngx_http_request_t *r) { + auto header = + ngx_esp_find_headers_in(r, kGrpcEncoding, sizeof(kGrpcEncoding) - 1); + + if (header == nullptr) { + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "GetCompressionAlgorithm: algorithm not in header"); + + return grpc_compression_algorithm::GRPC_COMPRESS_NONE; + } + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "GetCompressionAlgorithm: algorithm=%V", &header->value); + + grpc_compression_algorithm algorithm = + grpc_compression_algorithm::GRPC_COMPRESS_NONE; + grpc_compression_algorithm_parse( + reinterpret_cast(header->value.data), header->value.len, + &algorithm); + + return algorithm; +} + } // namespace NgxEspGrpcServerCall::NgxEspGrpcServerCall(ngx_http_request_t *r, @@ -488,6 +517,7 @@ void NgxEspGrpcServerCall::RunPendingRead() { } bool NgxEspGrpcServerCall::TryReadDownstreamMessage() { + static grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; // From http://www.grpc.io/docs/guides/wire.html, a GRPC message is: // * A one-byte compressed-flag // * A four-byte message length @@ -508,7 +538,7 @@ bool NgxEspGrpcServerCall::TryReadDownstreamMessage() { return false; } - // TODO: Implement compressed-flag + uint8_t compressed_flag = GetByte(downstream_slices_, 0); // Decode the length. Note that this is in network byte order. uint32_t msglen = 0; @@ -561,11 +591,43 @@ bool NgxEspGrpcServerCall::TryReadDownstreamMessage() { // the 'slices' vector (which will drop those reference counts as // the vector is destroyed). std::vector<::grpc::Slice> slices; - slices.reserve(it - downstream_slices_.begin()); - std::transform(downstream_slices_.begin(), it, std::back_inserter(slices), - [](gpr_slice &slice) { - return ::grpc::Slice(slice, ::grpc::Slice::STEAL_REF); - }); + + if (compressed_flag == 1) { + gpr_slice_buffer input; + gpr_slice_buffer_init(&input); + gpr_slice_buffer_addn(&input, downstream_slices_.data(), + it - downstream_slices_.begin()); + + gpr_slice_buffer output; + gpr_slice_buffer_init(&output); + + if (grpc_msg_decompress(&exec_ctx, GetCompressionAlgorithm(r_), &input, + &output) != 1) { + gpr_slice_buffer_destroy(&input); + gpr_slice_buffer_destroy(&output); + CompletePendingRead(false, + utils::Status(google::protobuf::util::error::INTERNAL, + "Failed to decompress GRPC message", + utils::Status::INTERNAL)); + + return true; + } + + slices.reserve(output.count); + std::transform(output.slices, output.slices + output.count, + std::back_inserter(slices), [](gpr_slice &slice) { + return ::grpc::Slice(slice, ::grpc::Slice::ADD_REF); + }); + + gpr_slice_buffer_destroy(&input); + gpr_slice_buffer_destroy(&output); + } else { + slices.reserve(it - downstream_slices_.begin()); + std::transform(downstream_slices_.begin(), it, std::back_inserter(slices), + [](gpr_slice &slice) { + return ::grpc::Slice(slice, ::grpc::Slice::STEAL_REF); + }); + } // Write the message byte buffer (giving the ByteBuffer its own // reference counts). diff --git a/src/nginx/t/BUILD b/src/nginx/t/BUILD index 10dc45c52..b829dffd4 100644 --- a/src/nginx/t/BUILD +++ b/src/nginx/t/BUILD @@ -103,6 +103,7 @@ nginx_suite( "grpc_auth_pkey.t", "grpc_call_flow_control.t", "grpc_cloud_trace.t", + "grpc_compression.t", "grpc_config_addr.t", "grpc_downstream_flow_control.t", "grpc_errors.t", diff --git a/src/nginx/t/grpc_compression.t b/src/nginx/t/grpc_compression.t new file mode 100644 index 000000000..3781fe444 --- /dev/null +++ b/src/nginx/t/grpc_compression.t @@ -0,0 +1,175 @@ +# Copyright (C) Extensible Service Proxy Authors +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +################################################################################ +# +use strict; +use warnings; + +################################################################################ + +use src::nginx::t::ApiManager; # Must be first (sets up import path to the Nginx test module) +use src::nginx::t::HttpServer; +use src::nginx::t::ServiceControl; +use Test::Nginx; # Imports Nginx's test module +use Test::More; # And the test framework + +################################################################################ + +# Port assignments +my $ServiceControlPort = ApiManager::pick_port(); +my $Http2NginxPort = ApiManager::pick_port(); +my $GrpcBackendPort = ApiManager::pick_port(); +my $GrpcFallbackPort = ApiManager::pick_port(); + +my $t = Test::Nginx->new()->has(qw/http proxy/)->plan(9); +$t->write_file('service.pb.txt', + ApiManager::get_grpc_test_service_config($GrpcBackendPort) . + ApiManager::read_test_file('testdata/logs_metrics.pb.txt') . <<"EOF"); +control { + environment: "http://127.0.0.1:${ServiceControlPort}" +} +EOF + +ApiManager::write_file_expand($t, 'nginx.conf', <<"EOF"); +%%TEST_GLOBALS%% +daemon off; +events { + worker_connections 32; +} +http { + %%TEST_GLOBALS_HTTP%% + server { + listen 127.0.0.1:${Http2NginxPort} http2; + server_name localhost; + location / { + endpoints { + api service.pb.txt; + %%TEST_CONFIG%% + on; + } + grpc_pass 127.0.0.2:${GrpcFallbackPort}; + } + } +} +EOF + +my $report_done = 'report_done'; + +$t->run_daemon(\&service_control, $t, $ServiceControlPort, 'servicecontrol.log', $report_done); +$t->run_daemon(\&ApiManager::grpc_test_server, $t, "127.0.0.1:${GrpcBackendPort}"); +is($t->waitforsocket("127.0.0.1:${ServiceControlPort}"), 1, 'Service control socket ready.'); +is($t->waitforsocket("127.0.0.1:${GrpcBackendPort}"), 1, 'GRPC test server socket ready.'); +$t->run(); +is($t->waitforsocket("127.0.0.1:${Http2NginxPort}"), 1, 'Nginx socket ready.'); + +################################################################################ +my $test_results = &ApiManager::run_grpc_test($t, <<"EOF"); +server_addr: "127.0.0.1:${Http2NginxPort}" +plans { + echo { + call_config { + api_key: "this-is-an-api-key" + compression: GZIP + } + request { + text: "This text must be long enough for GRPC library compress it. ______________________________________________________________" + } + } +} +EOF + +is($t->waitforfile("$t->{_testdir}/${report_done}"), 1, 'Report body file ready.'); +$t->stop_daemons(); + +my $test_results_expected = <<'EOF'; +results { + echo { + text: "This text must be long enough for GRPC library compress it. ______________________________________________________________" + } +} +EOF +is($test_results, $test_results_expected, 'Client tests completed as expected.'); + +my @servicecontrol_requests = ApiManager::read_http_stream($t, 'servicecontrol.log'); +is(scalar @servicecontrol_requests, 2, 'Service control was called twice'); + +# :check +my $r = shift @servicecontrol_requests; +like($r->{uri}, qr/:check$/, 'First call was a :check'); + +# :report +$r = shift @servicecontrol_requests; +like($r->{uri}, qr/:report$/, 'Second call was a :report'); + +my $report_body = ServiceControl::convert_proto($r->{body}, 'report_request', 'json'); +my $expected_report_body = ServiceControl::gen_report_body({ + 'serviceName' => 'endpoints-grpc-test.cloudendpointsapis.com', + 'api_method' => 'test.grpc.Test.Echo', + 'url' => '/test.grpc.Test/Echo', + 'protocol' => 'grpc', + 'api_key' => 'this-is-an-api-key', + 'api_name' => 'test.grpc.Test', + 'producer_project_id' => 'endpoints-grpc-test', + 'location' => 'us-central1', + 'http_method' => 'POST', + 'log_message' => 'Method: test.grpc.Test.Echo', + 'response_code' => '200', + 'request_size' => ($^O eq 'darwin' ? 371 : 373), + 'request_bytes' => ($^O eq 'darwin' ? 371 : 373), + 'streaming_request_message_counts' => 1, + 'streaming_response_message_counts' => 1, + }); +ok(ServiceControl::compare_http2_report_json($report_body, $expected_report_body), 'Report body is received.'); + +################################################################################ + +sub service_control { + my ($t, $port, $file, $done) = @_; + my $server = HttpServer->new($port, $t->testdir() . '/' . $file) + or die "Can't create test server socket: $!\n"; + + $server->on_sub('POST', '/v1/services/endpoints-grpc-test.cloudendpointsapis.com:check', sub { + my ($headers, $body, $client) = @_; + print $client <<'EOF'; +HTTP/1.1 200 OK +Connection: close + +EOF + }); + + $server->on_sub('POST', '/v1/services/endpoints-grpc-test.cloudendpointsapis.com:report', sub { + my ($headers, $body, $client) = @_; + print $client <<'EOF'; +HTTP/1.1 200 OK +Connection: close + +EOF + $t->write_file($done, ':report done'); + }); + + $server->run(); +} + +################################################################################ diff --git a/test/grpc/client-test-lib.cc b/test/grpc/client-test-lib.cc index f638b2296..4140de695 100644 --- a/test/grpc/client-test-lib.cc +++ b/test/grpc/client-test-lib.cc @@ -106,6 +106,10 @@ void SetCallConfig(const CallConfig &call_config, ClientContext *ctx) { for (const auto &it : call_config.metadata()) { ctx->AddMetadata(it.first, it.second); } + if (call_config.compression()) { + ctx->set_compression_algorithm( + static_cast(call_config.compression())); + } } template diff --git a/test/grpc/grpc-test.proto b/test/grpc/grpc-test.proto index c9ada43fc..6591996ed 100644 --- a/test/grpc/grpc-test.proto +++ b/test/grpc/grpc-test.proto @@ -74,6 +74,14 @@ message CallConfig { // Additional metadata to send to server map metadata = 4; + + enum CompressionAlgorithm { + NONE = 0; + DEFLATE = 1; + GZIP = 2; + } + // Compression algorithm the request + CompressionAlgorithm compression = 5; } // The outcome of a GRPC call.