-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathProtobufCodecLite.hpp
157 lines (125 loc) · 5.13 KB
/
ProtobufCodecLite.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#pragma once
#include <youth/core/DateTime.hpp>
#include <youth/core/Object.h>
#include <youth/net/Callbacks.h>
namespace google {
namespace protobuf {
class Message;
}
} // namespace google
namespace youth {
namespace rpc {
using MessagePtr = std::shared_ptr<google::protobuf::Message>;
class ProtobufCodecLite : core::noncopyable
{
public:
const static int kHeaderLen = sizeof(int32_t);
const static int kChecksumLen = sizeof(int32_t);
// same as m_codecstream.h kDefaultTotalBytesLimit
const static int kMaxMessageLen = 64 * 1024 * 1024;
enum ErrorCode {
kNoError = 0,
kInvalidLength,
kCheckSumError,
kInvalidNameLen,
kUnknownMessageType,
kParseError
};
// return false to stop parsing protobuf message
using RawMessageCallback
= std::function<bool(const net::TcpConnectionPtr &, std::string_view, core::DateTime)>;
using ProtobufMessageCallback
= std::function<void(const net::TcpConnectionPtr &, const MessagePtr &, DateTime)>;
using ErrorCallback
= std::function<void(const net::TcpConnectionPtr &, net::Buffer *, DateTime, ErrorCode)>;
ProtobufCodecLite(const ::google::protobuf::Message *prototype,
std::string_view tagArg,
const ProtobufMessageCallback &messageCb,
const RawMessageCallback &rawCb = RawMessageCallback(),
const ErrorCallback &errorCb = defaultErrorCallback)
: m_prototype(prototype)
, m_tag(tagArg)
, m_messageCallback(messageCb)
, m_rawCb(rawCb)
, m_errorCallback(errorCb)
, kMinMessageLen(static_cast<int>(tagArg.size() + kChecksumLen))
{}
virtual ~ProtobufCodecLite() = default;
const std::string &tag() const { return m_tag; }
void send(const net::TcpConnectionPtr &conn, const ::google::protobuf::Message &message);
void onMessage(const net::TcpConnectionPtr &conn, net::Buffer *buf, DateTime receiveTime);
virtual bool parseFromBuffer(std::string_view buf, google::protobuf::Message *message);
virtual int serializeToBuffer(const google::protobuf::Message &message, net::Buffer *buf);
static const std::string &errorCodeToString(ErrorCode errorCode);
// public for unit tests
ErrorCode parse(const char *buf, int len, ::google::protobuf::Message *message);
void fillEmptyBuffer(net::Buffer *buf, const google::protobuf::Message &message);
static int32_t checksum(const void *buf, int len);
static bool validateChecksum(const char *buf, int len);
static int32_t asInt32(const char *buf);
static void defaultErrorCallback(const net::TcpConnectionPtr &,
net::Buffer *,
DateTime,
ErrorCode);
private:
const ::google::protobuf::Message *m_prototype;
const std::string m_tag;
ProtobufMessageCallback m_messageCallback;
RawMessageCallback m_rawCb;
ErrorCallback m_errorCallback;
const int kMinMessageLen;
};
// TAG must be a variable with external linkage, not a string literal
template<typename MSG, const char *TAG, typename CODEC = ProtobufCodecLite>
class ProtobufCodecLiteT
{
static_assert(std::is_base_of<ProtobufCodecLite, CODEC>::value,
"CODEC should be derived from ProtobufCodecLite");
public:
using ConcreteMessagePtr = std::shared_ptr<MSG>;
using ProtobufMessageCallback
= std::function<void(const net::TcpConnectionPtr &, const ConcreteMessagePtr &, DateTime)>;
using RawMessageCallback = ProtobufCodecLite::RawMessageCallback;
using ErrorCallback = ProtobufCodecLite::ErrorCallback;
explicit ProtobufCodecLiteT(
const ProtobufMessageCallback &messageCb,
const RawMessageCallback &rawCb = RawMessageCallback(),
const ErrorCallback &errorCb = ProtobufCodecLite::defaultErrorCallback)
: m_messageCallback(messageCb)
, m_codec(&MSG::default_instance(),
TAG,
std::bind(&ProtobufCodecLiteT::onRpcMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3),
rawCb,
errorCb)
{}
~ProtobufCodecLiteT() {}
const std::string &tag() const { return m_codec.tag(); }
void send(const net::TcpConnectionPtr &conn, const MSG &message)
{
m_codec.send(conn, message);
}
void onMessage(const net::TcpConnectionPtr &conn, net::Buffer *buf, DateTime receiveTime)
{
m_codec.onMessage(conn, buf, receiveTime);
}
// internal
void onRpcMessage(const net::TcpConnectionPtr &conn,
const MessagePtr &message,
DateTime receiveTime)
{
m_messageCallback(conn, std::static_pointer_cast<MSG>(message), receiveTime);
}
void fillEmptyBuffer(net::Buffer *buf, const MSG &message)
{
m_codec.fillEmptyBuffer(buf, message);
}
private:
ProtobufMessageCallback m_messageCallback;
CODEC m_codec;
};
} // namespace rpc
} // namespace youth