diff --git a/contrib/endpoints/src/grpc/transcoding/BUILD b/contrib/endpoints/src/grpc/transcoding/BUILD index 894fd27b960..dce09d218c4 100644 --- a/contrib/endpoints/src/grpc/transcoding/BUILD +++ b/contrib/endpoints/src/grpc/transcoding/BUILD @@ -67,6 +67,7 @@ cc_library( "message_stream.h", ], deps = [ + ":transcoder_input_stream", "//external:protobuf", ], ) @@ -125,6 +126,7 @@ cc_library( "message_reader.h", ], deps = [ + ":transcoder_input_stream", "//external:protobuf", ], ) @@ -144,6 +146,17 @@ cc_library( ], ) +cc_library( + name = "transcoder_input_stream", + srcs = [ + "transcoder_input_stream.h", + ], + visibility = ["//visibility:public"], + deps = [ + "@protobuf_git//:protobuf", + ], +) + cc_library( name = "transcoding", srcs = [ @@ -223,6 +236,7 @@ cc_library( srcs = ["test_common.cc"], hdrs = ["test_common.h"], deps = [ + ":transcoder_input_stream", "//external:googletest", "//external:protobuf", "//external:service_config", diff --git a/contrib/endpoints/src/grpc/transcoding/message_reader.cc b/contrib/endpoints/src/grpc/transcoding/message_reader.cc index e5e4aeeae14..23b6a0cbad3 100644 --- a/contrib/endpoints/src/grpc/transcoding/message_reader.cc +++ b/contrib/endpoints/src/grpc/transcoding/message_reader.cc @@ -18,7 +18,6 @@ #include -#include "google/protobuf/io/zero_copy_stream.h" #include "google/protobuf/io/zero_copy_stream_impl.h" namespace google { @@ -29,7 +28,7 @@ namespace transcoding { namespace pb = ::google::protobuf; namespace pbio = ::google::protobuf::io; -MessageReader::MessageReader(pbio::ZeroCopyInputStream* in) +MessageReader::MessageReader(TranscoderInputStream* in) : in_(in), current_message_size_(0), have_current_message_size_(false), @@ -99,7 +98,7 @@ std::unique_ptr MessageReader::NextMessage() { // Check if we have the current message size. If not try to read it. if (!have_current_message_size_) { const size_t kDelimiterSize = 5; - if (in_->ByteCount() < static_cast(kDelimiterSize)) { + if (in_->BytesAvailable() < static_cast(kDelimiterSize)) { // We don't have 5 bytes available to read the length of the message. // Find out whether the stream is finished and return false. finished_ = IsStreamFinished(in_); @@ -117,10 +116,7 @@ std::unique_ptr MessageReader::NextMessage() { have_current_message_size_ = true; } - // We interpret ZeroCopyInputStream::ByteCount() as the number of bytes - // available for reading at the moment. Check if we have the full message - // available to read. - if (in_->ByteCount() < static_cast(current_message_size_)) { + if (in_->BytesAvailable() < static_cast(current_message_size_)) { // We don't have a full message return std::unique_ptr(); } diff --git a/contrib/endpoints/src/grpc/transcoding/message_reader.h b/contrib/endpoints/src/grpc/transcoding/message_reader.h index df175561f47..f07408e23af 100644 --- a/contrib/endpoints/src/grpc/transcoding/message_reader.h +++ b/contrib/endpoints/src/grpc/transcoding/message_reader.h @@ -17,7 +17,7 @@ #include -#include "google/protobuf/io/zero_copy_stream.h" +#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h" #include "google/protobuf/stubs/status.h" namespace google { @@ -49,11 +49,6 @@ namespace transcoding { // } // } // -// NOTE: MesssageReader assumes that ZeroCopyInputStream::ByteCount() returns -// the number of bytes available to read at the moment. That's what -// MessageReader uses to determine whether there is a complete message -// available or not. -// // NOTE: MessageReader is unable to recognize the case when there is an // incomplete message at the end of the input. The callers will need to // detect it and act appropriately. @@ -64,7 +59,7 @@ namespace transcoding { // class MessageReader { public: - MessageReader(::google::protobuf::io::ZeroCopyInputStream* in); + MessageReader(TranscoderInputStream* in); // If a full message is available, NextMessage() returns a ZeroCopyInputStream // over the message. Otherwise returns nullptr - this might be temporary, the @@ -82,7 +77,7 @@ class MessageReader { bool Finished() const { return finished_; } private: - ::google::protobuf::io::ZeroCopyInputStream* in_; + TranscoderInputStream* in_; // The size of the current message. unsigned int current_message_size_; // Whether we have read the current message size or not diff --git a/contrib/endpoints/src/grpc/transcoding/message_stream.cc b/contrib/endpoints/src/grpc/transcoding/message_stream.cc index 11af66bb170..26a09a23361 100644 --- a/contrib/endpoints/src/grpc/transcoding/message_stream.cc +++ b/contrib/endpoints/src/grpc/transcoding/message_stream.cc @@ -19,7 +19,6 @@ #include #include -#include "google/protobuf/io/zero_copy_stream.h" #include "google/protobuf/io/zero_copy_stream_impl_lite.h" namespace google { @@ -32,12 +31,12 @@ namespace pbio = ::google::protobuf::io; namespace { // a ZeroCopyInputStream implementation over a MessageStream implementation -class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream { +class InputStreamOverMessageStream : public TranscoderInputStream { public: - // src - the underlying MessageStream. ZeroCopyStreamOverMessageStream doesn't + // src - the underlying MessageStream. InputStreamOverMessageStream doesn't // maintain the ownership of src, the caller must make sure it exists - // throughtout the lifetime of ZeroCopyStreamOverMessageStream. - ZeroCopyStreamOverMessageStream(MessageStream* src) + // throughtout the lifetime of InputStreamOverMessageStream. + InputStreamOverMessageStream(MessageStream* src) : src_(src), message_(), position_(0) {} // ZeroCopyInputStream implementation @@ -72,19 +71,15 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream { bool Skip(int) { return false; } // Not implemented (no need) - ::google::protobuf::int64 ByteCount() const { - // NOTE: we are changing the ByteCount() interpretation. In our case - // ByteCount() returns the number of bytes available for reading at this - // moment. In the original interpretation it is supposed to be the number - // of bytes read so far. - // We need this such that the consumers are able to read the gRPC delimited - // message stream only if there is a full message available. + google::protobuf::int64 ByteCount() const { return 0; } // Not implemented + + int64_t BytesAvailable() const { if (position_ >= message_.size()) { // If the current message is all done, try to read the next message // to make sure we return the correct byte count. - const_cast(this)->ReadNextMessage(); + const_cast(this)->ReadNextMessage(); } - return static_cast<::google::protobuf::int64>(message_.size() - position_); + return static_cast(message_.size() - position_); } private: @@ -109,10 +104,9 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream { } // namespace -std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream> -MessageStream::CreateZeroCopyInputStream() { - return std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>( - new ZeroCopyStreamOverMessageStream(this)); +std::unique_ptr MessageStream::CreateInputStream() { + return std::unique_ptr( + new InputStreamOverMessageStream(this)); } } // namespace transcoding diff --git a/contrib/endpoints/src/grpc/transcoding/message_stream.h b/contrib/endpoints/src/grpc/transcoding/message_stream.h index 435040332a7..aef90f524f6 100644 --- a/contrib/endpoints/src/grpc/transcoding/message_stream.h +++ b/contrib/endpoints/src/grpc/transcoding/message_stream.h @@ -18,6 +18,7 @@ #include #include +#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h" #include "google/protobuf/io/zero_copy_stream.h" #include "google/protobuf/stubs/status.h" @@ -73,8 +74,7 @@ class MessageStream { // Virtual destructor virtual ~MessageStream() {} // Creates ZeroCopyInputStream implementation based on this stream - std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream> - CreateZeroCopyInputStream(); + std::unique_ptr CreateInputStream(); }; } // namespace transcoding diff --git a/contrib/endpoints/src/grpc/transcoding/message_stream_test.cc b/contrib/endpoints/src/grpc/transcoding/message_stream_test.cc index 3699b499bfc..81276310f01 100644 --- a/contrib/endpoints/src/grpc/transcoding/message_stream_test.cc +++ b/contrib/endpoints/src/grpc/transcoding/message_stream_test.cc @@ -70,14 +70,14 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test { bool Test(const Messages& messages) { TestMessageStream test_message_stream; - auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream(); + auto input_stream = test_message_stream.CreateInputStream(); const void* data = nullptr; int size = 0; // Check that Next() returns true and a 0-sized buffer meaning that // nothing is available at the moment. - if (!zero_copy_stream->Next(&data, &size)) { + if (!input_stream->Next(&data, &size)) { ADD_FAILURE() << "The stream finished unexpectedly" << std::endl; return false; } @@ -91,13 +91,13 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test { test_message_stream.AddMessage(message); // message.size() bytes must be available for reading - if (static_cast(message.size()) != zero_copy_stream->ByteCount()) { - EXPECT_EQ(message.size(), zero_copy_stream->ByteCount()); + if (static_cast(message.size()) != input_stream->BytesAvailable()) { + EXPECT_EQ(message.size(), input_stream->BytesAvailable()); return false; } // Now try to read & match the message - if (!zero_copy_stream->Next(&data, &size)) { + if (!input_stream->Next(&data, &size)) { ADD_FAILURE() << "The stream finished unexpectedly" << std::endl; return false; } @@ -120,16 +120,16 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test { // Not a valid test case continue; } - zero_copy_stream->BackUp(backup_size); + input_stream->BackUp(backup_size); // backup_size bytes must be available for reading again - if (static_cast(backup_size) != zero_copy_stream->ByteCount()) { - EXPECT_EQ(message.size(), zero_copy_stream->ByteCount()); + if (static_cast(backup_size) != input_stream->BytesAvailable()) { + EXPECT_EQ(message.size(), input_stream->BytesAvailable()); return false; } // Now Next() must return the backed up data again. - if (!zero_copy_stream->Next(&data, &size)) { + if (!input_stream->Next(&data, &size)) { ADD_FAILURE() << "The stream finished unexpectedly" << std::endl; return false; } @@ -143,7 +143,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test { } // At this point no data should be available - if (!zero_copy_stream->Next(&data, &size)) { + if (!input_stream->Next(&data, &size)) { ADD_FAILURE() << "The stream finished unexpectedly" << std::endl; return false; } @@ -156,7 +156,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test { // Now finish the MessageStream & make sure the ZeroCopyInputStream has // ended. test_message_stream.Finish(); - if (zero_copy_stream->Next(&data, &size)) { + if (input_stream->Next(&data, &size)) { ADD_FAILURE() << "The stream still hasn't finished" << std::endl; return false; } @@ -201,14 +201,14 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DifferenteSizesOneStream) { TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) { TestMessageStream test_message_stream; - auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream(); + auto input_stream = test_message_stream.CreateInputStream(); const void* data = nullptr; int size = 0; // Check that Next() returns true and a 0-sized buffer meaning that // nothing is available at the moment. - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(0, size); // Test messages @@ -221,16 +221,16 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) { test_message_stream.AddMessage(message1); // message1 is available for reading - EXPECT_EQ(message1.size(), zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(message1.size(), input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message1, std::string(reinterpret_cast(data), size)); // Back up a bit - zero_copy_stream->BackUp(5); + input_stream->BackUp(5); // Now read the backed up data again - EXPECT_EQ(5, zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(5, input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message1.substr(message1.size() - 5), std::string(reinterpret_cast(data), size)); @@ -238,20 +238,20 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) { test_message_stream.AddMessage(message2); // message2 is available for reading - EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(message2.size(), input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message2, std::string(reinterpret_cast(data), size)); // Back up all of message2 - zero_copy_stream->BackUp(message2.size()); + input_stream->BackUp(message2.size()); // Now read message2 again - EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(message2.size(), input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message2, std::string(reinterpret_cast(data), size)); // At this point no data should be available - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(0, size); // Add both message3 & message4 & finish the MessageStream afterwards @@ -260,16 +260,16 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) { test_message_stream.Finish(); // Read & match both message3 & message4 - EXPECT_EQ(message3.size(), zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(message3.size(), input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message3, std::string(reinterpret_cast(data), size)); - EXPECT_EQ(message4.size(), zero_copy_stream->ByteCount()); - EXPECT_TRUE(zero_copy_stream->Next(&data, &size)); + EXPECT_EQ(message4.size(), input_stream->BytesAvailable()); + EXPECT_TRUE(input_stream->Next(&data, &size)); EXPECT_EQ(message4, std::string(reinterpret_cast(data), size)); // All done! - EXPECT_FALSE(zero_copy_stream->Next(&data, &size)); + EXPECT_FALSE(input_stream->Next(&data, &size)); } } // namespace diff --git a/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.cc b/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.cc index eb5e54f9bec..8354cc07f25 100644 --- a/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.cc +++ b/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.cc @@ -18,7 +18,6 @@ #include -#include "google/protobuf/io/zero_copy_stream.h" #include "google/protobuf/io/zero_copy_stream_impl_lite.h" #include "google/protobuf/stubs/status.h" #include "google/protobuf/util/json_util.h" @@ -31,7 +30,7 @@ namespace transcoding { ResponseToJsonTranslator::ResponseToJsonTranslator( ::google::protobuf::util::TypeResolver* type_resolver, std::string type_url, - bool streaming, ::google::protobuf::io::ZeroCopyInputStream* in) + bool streaming, TranscoderInputStream* in) : type_resolver_(type_resolver), type_url_(std::move(type_url)), streaming_(streaming), diff --git a/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.h b/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.h index e41791467f8..d674d71d258 100644 --- a/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.h +++ b/contrib/endpoints/src/grpc/transcoding/response_to_json_translator.h @@ -66,8 +66,7 @@ class ResponseToJsonTranslator : public MessageStream { // format (http://www.grpc.io/docs/guides/wire.html) ResponseToJsonTranslator( ::google::protobuf::util::TypeResolver* type_resolver, - std::string type_url, bool streaming, - ::google::protobuf::io::ZeroCopyInputStream* in); + std::string type_url, bool streaming, TranscoderInputStream* in); // MessageStream implementation bool NextMessage(std::string* message); diff --git a/contrib/endpoints/src/grpc/transcoding/test_common.cc b/contrib/endpoints/src/grpc/transcoding/test_common.cc index 269bc1172d7..12a278dbfa3 100644 --- a/contrib/endpoints/src/grpc/transcoding/test_common.cc +++ b/contrib/endpoints/src/grpc/transcoding/test_common.cc @@ -81,7 +81,7 @@ void TestZeroCopyInputStream::BackUp(int count) { position_ -= count; } -pb::int64 TestZeroCopyInputStream::ByteCount() const { +int64_t TestZeroCopyInputStream::BytesAvailable() const { auto total = current_.size() - position_; for (auto chunk : chunks_) { total += chunk.size(); diff --git a/contrib/endpoints/src/grpc/transcoding/test_common.h b/contrib/endpoints/src/grpc/transcoding/test_common.h index bc452e052fc..b6f2532b867 100644 --- a/contrib/endpoints/src/grpc/transcoding/test_common.h +++ b/contrib/endpoints/src/grpc/transcoding/test_common.h @@ -20,6 +20,7 @@ #include #include +#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h" #include "google/api/service.pb.h" #include "google/protobuf/io/zero_copy_stream.h" #include "google/protobuf/text_format.h" @@ -33,8 +34,7 @@ namespace testing { // An implementation of ZeroCopyInputStream for testing. // The tests define the chunks that TestZeroCopyInputStream produces. -class TestZeroCopyInputStream - : public ::google::protobuf::io::ZeroCopyInputStream { +class TestZeroCopyInputStream : public TranscoderInputStream { public: TestZeroCopyInputStream(); @@ -50,8 +50,9 @@ class TestZeroCopyInputStream // ZeroCopyInputStream methods bool Next(const void** data, int* size); void BackUp(int count); - ::google::protobuf::int64 ByteCount() const; - bool Skip(int) { return false; } // Not implemented + int64_t BytesAvailable() const; + ::google::protobuf::int64 ByteCount() const { return 0; } // Not implemented + bool Skip(int) { return false; } // Not implemented private: std::deque chunks_; diff --git a/contrib/endpoints/src/grpc/transcoding/transcoder.h b/contrib/endpoints/src/grpc/transcoding/transcoder.h index da58386089d..806c519f921 100644 --- a/contrib/endpoints/src/grpc/transcoding/transcoder.h +++ b/contrib/endpoints/src/grpc/transcoding/transcoder.h @@ -15,7 +15,7 @@ #ifndef GRPC_TRANSCODING_TRANSCODER_H_ #define GRPC_TRANSCODING_TRANSCODER_H_ -#include "google/protobuf/io/zero_copy_stream.h" +#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h" #include "google/protobuf/stubs/status.h" namespace google { @@ -28,11 +28,11 @@ namespace transcoding { // - translated response stream, // - status of response translation. // -// NOTE: Transcoder uses ::google::protobuf::io::ZeroCopyInputStream for -// carrying the payloads both for input and output. It assumes the -// following interpretation of the ZeroCopyInputStream interface: +// NOTE: Transcoder uses TranscoderInputStream for carrying the payloads +// both for input and output. It assumes the following interpretation +// of the TranscoderInputStream interface: // -// bool ZeroCopyInputStream::Next(const void** data, int* size); +// bool TranscoderInputStream::Next(const void** data, int* size); // // Obtains a chunk of data from the stream. // @@ -52,7 +52,7 @@ namespace transcoding { // again later. // // -// void ZeroCopyInputStream::BackUp(int count); +// void TranscoderInputStream::BackUp(int count); // // Backs up a number of bytes, so that the next call to Next() returns // data again that was already returned by the last call to Next(). This @@ -72,12 +72,12 @@ namespace transcoding { // the same data again before producing new data. // // -// bool ZeroCopyInputStream::Skip(int count); +// bool TranscoderInputStream::Skip(int count); // // Not used and not implemented by the Transcoder. // // -// int64 ZeroCopyInputStream::ByteCount() const; +// int64_t TranscoderInputStream::BytesAvailable() const; // // Returns the number of bytes available for reading at this moment // @@ -133,7 +133,7 @@ namespace transcoding { class Transcoder { public: // ZeroCopyInputStream to read the transcoded request. - virtual ::google::protobuf::io::ZeroCopyInputStream* RequestOutput() = 0; + virtual TranscoderInputStream* RequestOutput() = 0; // The status of request transcoding virtual ::google::protobuf::util::Status RequestStatus() = 0; diff --git a/contrib/endpoints/src/grpc/transcoding/transcoder_factory.cc b/contrib/endpoints/src/grpc/transcoding/transcoder_factory.cc index 2dfcf511a17..f42b6d23108 100644 --- a/contrib/endpoints/src/grpc/transcoding/transcoder_factory.cc +++ b/contrib/endpoints/src/grpc/transcoding/transcoder_factory.cc @@ -52,29 +52,23 @@ class TranscoderImpl : public Transcoder { std::unique_ptr response_translator) : request_translator_(std::move(request_translator)), response_translator_(std::move(response_translator)), - request_zero_copy_stream_( - request_translator_->Output().CreateZeroCopyInputStream()), - response_zero_copy_stream_( - response_translator_->CreateZeroCopyInputStream()) {} + request_stream_(request_translator_->Output().CreateInputStream()), + response_stream_(response_translator_->CreateInputStream()) {} // Transcoder implementation - pbio::ZeroCopyInputStream* RequestOutput() { - return request_zero_copy_stream_.get(); - } + TranscoderInputStream* RequestOutput() { return request_stream_.get(); } pbutil::Status RequestStatus() { return request_translator_->Output().Status(); } - pbio::ZeroCopyInputStream* ResponseOutput() { - return response_zero_copy_stream_.get(); - } + pbio::ZeroCopyInputStream* ResponseOutput() { return response_stream_.get(); } pbutil::Status ResponseStatus() { return response_translator_->Status(); } private: std::unique_ptr request_translator_; std::unique_ptr response_translator_; - std::unique_ptr request_zero_copy_stream_; - std::unique_ptr response_zero_copy_stream_; + std::unique_ptr request_stream_; + std::unique_ptr response_stream_; }; // Converts MethodCallInfo into a RequestInfo structure needed by the @@ -132,7 +126,7 @@ TranscoderFactory::TranscoderFactory(const ::google::api::Service& service) pbutil::Status TranscoderFactory::Create( const MethodCallInfo& call_info, pbio::ZeroCopyInputStream* request_input, - pbio::ZeroCopyInputStream* response_input, + TranscoderInputStream* response_input, std::unique_ptr* transcoder) { // Convert MethodCallInfo into RequestInfo RequestInfo request_info; diff --git a/contrib/endpoints/src/grpc/transcoding/transcoder_factory.h b/contrib/endpoints/src/grpc/transcoding/transcoder_factory.h index b7c10865025..d80c93c7f86 100644 --- a/contrib/endpoints/src/grpc/transcoding/transcoder_factory.h +++ b/contrib/endpoints/src/grpc/transcoding/transcoder_factory.h @@ -19,6 +19,7 @@ #include "contrib/endpoints/include/api_manager/method_call_info.h" #include "contrib/endpoints/src/grpc/transcoding/transcoder.h" +#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h" #include "contrib/endpoints/src/grpc/transcoding/type_helper.h" #include "google/api/service.pb.h" #include "google/protobuf/io/zero_copy_stream.h" @@ -71,7 +72,7 @@ class TranscoderFactory { ::google::protobuf::util::Status Create( const MethodCallInfo& call_info, ::google::protobuf::io::ZeroCopyInputStream* request_input, - ::google::protobuf::io::ZeroCopyInputStream* response_input, + TranscoderInputStream* response_input, std::unique_ptr* transcoder); private: diff --git a/contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h b/contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h new file mode 100644 index 00000000000..240067d600b --- /dev/null +++ b/contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h @@ -0,0 +1,35 @@ +/* Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef GRPC_TRANSCODING_TRANSCODER_INPUT_STREAM_H_ +#define GRPC_TRANSCODING_TRANSCODER_INPUT_STREAM_H_ + +#include "google/protobuf/io/zero_copy_stream.h" + +namespace google { +namespace api_manager { +namespace transcoding { + +class TranscoderInputStream + : public virtual google::protobuf::io::ZeroCopyInputStream { + public: + // returns the number of bytes available to read at the moment. + virtual int64_t BytesAvailable() const = 0; +}; + +} // namespace transcoding +} // namespace api_manager +} // namespace google + +#endif // API_MANAGER_TRANSCODER_H_ diff --git a/contrib/endpoints/src/grpc/transcoding/transcoder_test.cc b/contrib/endpoints/src/grpc/transcoding/transcoder_test.cc index eb2a38eaf5d..86fcc1911c1 100644 --- a/contrib/endpoints/src/grpc/transcoding/transcoder_test.cc +++ b/contrib/endpoints/src/grpc/transcoding/transcoder_test.cc @@ -139,7 +139,7 @@ class TranscoderTest : public ::testing::Test { } pbutil::Status Build(pbio::ZeroCopyInputStream *request_input, - pbio::ZeroCopyInputStream *response_input, + TranscoderInputStream *response_input, std::unique_ptr *transcoder) { MethodCallInfo call_info; call_info.method_info = method_info_.get();