Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions contrib/endpoints/src/grpc/transcoding/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ cc_library(
"message_stream.h",
],
deps = [
":transcoder_input_stream",
"//external:protobuf",
],
)
Expand Down Expand Up @@ -125,6 +126,7 @@ cc_library(
"message_reader.h",
],
deps = [
":transcoder_input_stream",
"//external:protobuf",
],
)
Expand All @@ -144,6 +146,17 @@ cc_library(
],
)

cc_library(
name = "transcoder_input_stream",
srcs = [
"transcoder_input_stream.h",
],
visibility = ["//visibility:public"],
deps = [
"@protobuf_git//:protobuf",
],
)

cc_library(
name = "transcoding",
srcs = [
Expand Down Expand Up @@ -223,6 +236,7 @@ cc_library(
srcs = ["test_common.cc"],
hdrs = ["test_common.h"],
deps = [
":transcoder_input_stream",
"//external:googletest",
"//external:protobuf",
"//external:service_config",
Expand Down
10 changes: 3 additions & 7 deletions contrib/endpoints/src/grpc/transcoding/message_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <memory>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"

namespace google {
Expand All @@ -29,7 +28,7 @@ namespace transcoding {
namespace pb = ::google::protobuf;
namespace pbio = ::google::protobuf::io;

MessageReader::MessageReader(pbio::ZeroCopyInputStream* in)
MessageReader::MessageReader(TranscoderInputStream* in)
: in_(in),
current_message_size_(0),
have_current_message_size_(false),
Expand Down Expand Up @@ -99,7 +98,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
// Check if we have the current message size. If not try to read it.
if (!have_current_message_size_) {
const size_t kDelimiterSize = 5;
if (in_->ByteCount() < static_cast<pb::int64>(kDelimiterSize)) {
if (in_->BytesAvailable() < static_cast<pb::int64>(kDelimiterSize)) {
// We don't have 5 bytes available to read the length of the message.
// Find out whether the stream is finished and return false.
finished_ = IsStreamFinished(in_);
Expand All @@ -117,10 +116,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
have_current_message_size_ = true;
}

// We interpret ZeroCopyInputStream::ByteCount() as the number of bytes
// available for reading at the moment. Check if we have the full message
// available to read.
if (in_->ByteCount() < static_cast<pb::int64>(current_message_size_)) {
if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
// We don't have a full message
return std::unique_ptr<pbio::ZeroCopyInputStream>();
}
Expand Down
11 changes: 3 additions & 8 deletions contrib/endpoints/src/grpc/transcoding/message_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <memory>

#include "google/protobuf/io/zero_copy_stream.h"
#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
#include "google/protobuf/stubs/status.h"

namespace google {
Expand Down Expand Up @@ -49,11 +49,6 @@ namespace transcoding {
// }
// }
//
// NOTE: MesssageReader assumes that ZeroCopyInputStream::ByteCount() returns
// the number of bytes available to read at the moment. That's what
// MessageReader uses to determine whether there is a complete message
// available or not.
//
// NOTE: MessageReader is unable to recognize the case when there is an
// incomplete message at the end of the input. The callers will need to
// detect it and act appropriately.
Expand All @@ -64,7 +59,7 @@ namespace transcoding {
//
class MessageReader {
public:
MessageReader(::google::protobuf::io::ZeroCopyInputStream* in);
MessageReader(TranscoderInputStream* in);

// If a full message is available, NextMessage() returns a ZeroCopyInputStream
// over the message. Otherwise returns nullptr - this might be temporary, the
Expand All @@ -82,7 +77,7 @@ class MessageReader {
bool Finished() const { return finished_; }

private:
::google::protobuf::io::ZeroCopyInputStream* in_;
TranscoderInputStream* in_;
// The size of the current message.
unsigned int current_message_size_;
// Whether we have read the current message size or not
Expand Down
30 changes: 12 additions & 18 deletions contrib/endpoints/src/grpc/transcoding/message_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <string>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"

namespace google {
Expand All @@ -32,12 +31,12 @@ namespace pbio = ::google::protobuf::io;
namespace {

// a ZeroCopyInputStream implementation over a MessageStream implementation
class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {
class InputStreamOverMessageStream : public TranscoderInputStream {
public:
// src - the underlying MessageStream. ZeroCopyStreamOverMessageStream doesn't
// src - the underlying MessageStream. InputStreamOverMessageStream doesn't
// maintain the ownership of src, the caller must make sure it exists
// throughtout the lifetime of ZeroCopyStreamOverMessageStream.
ZeroCopyStreamOverMessageStream(MessageStream* src)
// throughtout the lifetime of InputStreamOverMessageStream.
InputStreamOverMessageStream(MessageStream* src)
: src_(src), message_(), position_(0) {}

// ZeroCopyInputStream implementation
Expand Down Expand Up @@ -72,19 +71,15 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {

bool Skip(int) { return false; } // Not implemented (no need)

::google::protobuf::int64 ByteCount() const {
// NOTE: we are changing the ByteCount() interpretation. In our case
// ByteCount() returns the number of bytes available for reading at this
// moment. In the original interpretation it is supposed to be the number
// of bytes read so far.
// We need this such that the consumers are able to read the gRPC delimited
// message stream only if there is a full message available.
google::protobuf::int64 ByteCount() const { return 0; } // Not implemented

int64_t BytesAvailable() const {
if (position_ >= message_.size()) {
// If the current message is all done, try to read the next message
// to make sure we return the correct byte count.
const_cast<ZeroCopyStreamOverMessageStream*>(this)->ReadNextMessage();
const_cast<InputStreamOverMessageStream*>(this)->ReadNextMessage();
}
return static_cast<::google::protobuf::int64>(message_.size() - position_);
return static_cast<int64_t>(message_.size() - position_);
}

private:
Expand All @@ -109,10 +104,9 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {

} // namespace

std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
MessageStream::CreateZeroCopyInputStream() {
return std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>(
new ZeroCopyStreamOverMessageStream(this));
std::unique_ptr<TranscoderInputStream> MessageStream::CreateInputStream() {
return std::unique_ptr<TranscoderInputStream>(
new InputStreamOverMessageStream(this));
}

} // namespace transcoding
Expand Down
4 changes: 2 additions & 2 deletions contrib/endpoints/src/grpc/transcoding/message_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <string>

#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/stubs/status.h"

Expand Down Expand Up @@ -73,8 +74,7 @@ class MessageStream {
// Virtual destructor
virtual ~MessageStream() {}
// Creates ZeroCopyInputStream implementation based on this stream
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
CreateZeroCopyInputStream();
std::unique_ptr<TranscoderInputStream> CreateInputStream();
};

} // namespace transcoding
Expand Down
58 changes: 29 additions & 29 deletions contrib/endpoints/src/grpc/transcoding/message_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {

bool Test(const Messages& messages) {
TestMessageStream test_message_stream;
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
auto input_stream = test_message_stream.CreateInputStream();

const void* data = nullptr;
int size = 0;

// Check that Next() returns true and a 0-sized buffer meaning that
// nothing is available at the moment.
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -91,13 +91,13 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
test_message_stream.AddMessage(message);

// message.size() bytes must be available for reading
if (static_cast<int>(message.size()) != zero_copy_stream->ByteCount()) {
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
if (static_cast<int>(message.size()) != input_stream->BytesAvailable()) {
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
return false;
}

// Now try to read & match the message
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -120,16 +120,16 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
// Not a valid test case
continue;
}
zero_copy_stream->BackUp(backup_size);
input_stream->BackUp(backup_size);

// backup_size bytes must be available for reading again
if (static_cast<int>(backup_size) != zero_copy_stream->ByteCount()) {
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
if (static_cast<int>(backup_size) != input_stream->BytesAvailable()) {
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
return false;
}

// Now Next() must return the backed up data again.
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -143,7 +143,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
}

// At this point no data should be available
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -156,7 +156,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
// Now finish the MessageStream & make sure the ZeroCopyInputStream has
// ended.
test_message_stream.Finish();
if (zero_copy_stream->Next(&data, &size)) {
if (input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream still hasn't finished" << std::endl;
return false;
}
Expand Down Expand Up @@ -201,14 +201,14 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DifferenteSizesOneStream) {

TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
TestMessageStream test_message_stream;
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
auto input_stream = test_message_stream.CreateInputStream();

const void* data = nullptr;
int size = 0;

// Check that Next() returns true and a 0-sized buffer meaning that
// nothing is available at the moment.
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(0, size);

// Test messages
Expand All @@ -221,37 +221,37 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
test_message_stream.AddMessage(message1);

// message1 is available for reading
EXPECT_EQ(message1.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message1.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message1, std::string(reinterpret_cast<const char*>(data), size));

// Back up a bit
zero_copy_stream->BackUp(5);
input_stream->BackUp(5);

// Now read the backed up data again
EXPECT_EQ(5, zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(5, input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message1.substr(message1.size() - 5),
std::string(reinterpret_cast<const char*>(data), size));

// Add message2 to the MessageStream
test_message_stream.AddMessage(message2);

// message2 is available for reading
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));

// Back up all of message2
zero_copy_stream->BackUp(message2.size());
input_stream->BackUp(message2.size());

// Now read message2 again
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));

// At this point no data should be available
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(0, size);

// Add both message3 & message4 & finish the MessageStream afterwards
Expand All @@ -260,16 +260,16 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
test_message_stream.Finish();

// Read & match both message3 & message4
EXPECT_EQ(message3.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message3.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message3, std::string(reinterpret_cast<const char*>(data), size));

EXPECT_EQ(message4.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message4.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message4, std::string(reinterpret_cast<const char*>(data), size));

// All done!
EXPECT_FALSE(zero_copy_stream->Next(&data, &size));
EXPECT_FALSE(input_stream->Next(&data, &size));
}

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <string>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
#include "google/protobuf/stubs/status.h"
#include "google/protobuf/util/json_util.h"
Expand All @@ -31,7 +30,7 @@ namespace transcoding {

ResponseToJsonTranslator::ResponseToJsonTranslator(
::google::protobuf::util::TypeResolver* type_resolver, std::string type_url,
bool streaming, ::google::protobuf::io::ZeroCopyInputStream* in)
bool streaming, TranscoderInputStream* in)
: type_resolver_(type_resolver),
type_url_(std::move(type_url)),
streaming_(streaming),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class ResponseToJsonTranslator : public MessageStream {
// format (http://www.grpc.io/docs/guides/wire.html)
ResponseToJsonTranslator(
::google::protobuf::util::TypeResolver* type_resolver,
std::string type_url, bool streaming,
::google::protobuf::io::ZeroCopyInputStream* in);
std::string type_url, bool streaming, TranscoderInputStream* in);

// MessageStream implementation
bool NextMessage(std::string* message);
Expand Down
2 changes: 1 addition & 1 deletion contrib/endpoints/src/grpc/transcoding/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void TestZeroCopyInputStream::BackUp(int count) {
position_ -= count;
}

pb::int64 TestZeroCopyInputStream::ByteCount() const {
int64_t TestZeroCopyInputStream::BytesAvailable() const {
auto total = current_.size() - position_;
for (auto chunk : chunks_) {
total += chunk.size();
Expand Down
Loading