-
Notifications
You must be signed in to change notification settings - Fork 5.3k
dubbo_proxy: Implement the routing of Dubbo requests #5973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
61fb32a
95dc2b5
8232308
bd5a67d
48b834e
cc34c53
788706b
3c16f0f
f15e8d8
f3dfee0
2e071eb
a0c81f4
0837aba
2376ed9
379aeba
a9fc68b
d9eb2c1
dac7c18
5352fa0
c9f1567
4ecaa7a
8f66806
1fe2cd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| load("//bazel:api_build_system.bzl", "api_proto_library_internal") | ||
|
|
||
| licenses(["notice"]) # Apache 2 | ||
|
|
||
| api_proto_library_internal( | ||
| name = "router", | ||
| srcs = ["router.proto"], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| syntax = "proto3"; | ||
|
|
||
| package envoy.config.filter.dubbo.router.v2alpha1; | ||
|
|
||
| option java_outer_classname = "RouterProto"; | ||
| option java_multiple_files = true; | ||
| option java_package = "io.envoyproxy.envoy.config.filter.dubbo.router.v2alpha1"; | ||
| option go_package = "v2alpha1"; | ||
|
|
||
| // [#protodoc-title: Router] | ||
| // Dubbo router :ref:`configuration overview <config_dubbo_filters_router>`. | ||
|
|
||
| message Router { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| #include "extensions/filters/network/dubbo_proxy/app_exception.h" | ||
|
|
||
| #include "common/buffer/buffer_impl.h" | ||
|
|
||
| #include "extensions/filters/network/dubbo_proxy/message.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Extensions { | ||
| namespace NetworkFilters { | ||
| namespace DubboProxy { | ||
|
|
||
| AppException::AppException(ResponseStatus status, const std::string& what) | ||
| : EnvoyException(what), status_(status), | ||
| response_type_(RpcResponseType::ResponseWithException) {} | ||
|
|
||
| AppException::ResponseType AppException::encode(MessageMetadata& metadata, | ||
| DubboProxy::Protocol& protocol, | ||
| Deserializer& deserializer, | ||
| Buffer::Instance& buffer) const { | ||
| ASSERT(buffer.length() == 0); | ||
|
|
||
| ENVOY_LOG(debug, "err {}", what()); | ||
|
|
||
| // Serialize the response content to get the serialized response length. | ||
| const std::string& response = what(); | ||
| size_t serialized_body_size = deserializer.serializeRpcResult(buffer, response, response_type_); | ||
|
||
|
|
||
| metadata.setResponseStatus(status_); | ||
| metadata.setMessageType(MessageType::Response); | ||
|
|
||
| Buffer::OwnedImpl protocol_buffer; | ||
| if (!protocol.encode(protocol_buffer, serialized_body_size, metadata)) { | ||
| throw EnvoyException("failed to encode local reply message"); | ||
| } | ||
|
|
||
| buffer.prepend(protocol_buffer); | ||
|
|
||
| return DirectResponse::ResponseType::Exception; | ||
| } | ||
|
|
||
| } // namespace DubboProxy | ||
| } // namespace NetworkFilters | ||
| } // namespace Extensions | ||
| } // namespace Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| #pragma once | ||
|
|
||
| #include "envoy/common/exception.h" | ||
|
|
||
| #include "extensions/filters/network/dubbo_proxy/deserializer.h" | ||
| #include "extensions/filters/network/dubbo_proxy/filters/filter.h" | ||
| #include "extensions/filters/network/dubbo_proxy/metadata.h" | ||
| #include "extensions/filters/network/dubbo_proxy/protocol.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Extensions { | ||
| namespace NetworkFilters { | ||
| namespace DubboProxy { | ||
|
|
||
| struct AppException : public EnvoyException, | ||
lizan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public DubboFilters::DirectResponse, | ||
| Logger::Loggable<Logger::Id::dubbo> { | ||
| AppException(ResponseStatus status, const std::string& what); | ||
| AppException(const AppException& ex) = default; | ||
|
|
||
| using ResponseType = DubboFilters::DirectResponse::ResponseType; | ||
| ResponseType encode(MessageMetadata& metadata, Protocol& protocol, Deserializer& deserializer, | ||
| Buffer::Instance& buffer) const override; | ||
|
|
||
| const ResponseStatus status_; | ||
| const RpcResponseType response_type_; | ||
| }; | ||
|
|
||
| } // namespace DubboProxy | ||
| } // namespace NetworkFilters | ||
| } // namespace Extensions | ||
| } // namespace Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -107,6 +107,18 @@ class Deserializer { | |
| * @throws EnvoyException if the data is not valid for this serialization | ||
| */ | ||
| virtual RpcResultPtr deserializeRpcResult(Buffer::Instance& buffer, size_t body_size) PURE; | ||
|
|
||
| /** | ||
| * serialize result of an rpc call | ||
| * If successful, the output_buffer is written to the serialized data | ||
| * | ||
| * @param output_buffer store the serialized data | ||
| * @param content the rpc response content | ||
| * @param type the rpc response type | ||
| * @return size_t the length of the serialized content | ||
| */ | ||
| virtual size_t serializeRpcResult(Buffer::Instance& output_buffer, const std::string& content, | ||
|
||
| RpcResponseType type) PURE; | ||
| }; | ||
|
|
||
| typedef std::unique_ptr<Deserializer> DeserializerPtr; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,36 @@ bool isValidResponseStatus(ResponseStatus status) { | |
| return true; | ||
| } | ||
|
|
||
| void parseRequestInfoFromBuffer(Buffer::Instance& data, MessageMetadataSharedPtr metadata) { | ||
| ASSERT(data.length() >= DubboProtocolImpl::MessageSize); | ||
| uint8_t flag = data.peekInt<uint8_t>(FlagOffset); | ||
| bool is_two_way = (flag & TwoWayMask) == TwoWayMask ? true : false; | ||
| SerializationType type = static_cast<SerializationType>(flag & SerializationTypeMask); | ||
| if (!isValidSerializationType(type)) { | ||
| throw EnvoyException( | ||
| fmt::format("invalid dubbo message serialization type {}", | ||
| static_cast<std::underlying_type<SerializationType>::type>(type))); | ||
| } | ||
|
|
||
| if (!is_two_way) { | ||
| metadata->setMessageType(MessageType::Oneway); | ||
| } | ||
|
|
||
| metadata->setSerializationType(type); | ||
| } | ||
|
|
||
| void parseResponseInfoFromBuffer(Buffer::Instance& buffer, MessageMetadataSharedPtr metadata) { | ||
| ASSERT(buffer.length() >= DubboProtocolImpl::MessageSize); | ||
| ResponseStatus status = static_cast<ResponseStatus>(buffer.peekInt<uint8_t>(StatusOffset)); | ||
| if (!isValidResponseStatus(status)) { | ||
| throw EnvoyException( | ||
| fmt::format("invalid dubbo message response status {}", | ||
| static_cast<std::underlying_type<ResponseStatus>::type>(status))); | ||
| } | ||
|
|
||
| metadata->setResponseStatus(status); | ||
| } | ||
|
|
||
| void RequestMessageImpl::fromBuffer(Buffer::Instance& data) { | ||
| ASSERT(data.length() >= DubboProtocolImpl::MessageSize); | ||
| uint8_t flag = data.peekInt<uint8_t>(FlagOffset); | ||
|
|
@@ -76,6 +106,8 @@ void ResponseMessageImpl::fromBuffer(Buffer::Instance& buffer) { | |
| } | ||
|
|
||
| bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* context) { | ||
| ASSERT(callbacks_); | ||
|
|
||
| if (buffer.length() < DubboProtocolImpl::MessageSize) { | ||
| return false; | ||
| } | ||
|
|
@@ -103,18 +135,80 @@ bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* cont | |
| std::make_unique<RequestMessageImpl>(request_id, body_size, is_event); | ||
| req->fromBuffer(buffer); | ||
| context->is_request_ = true; | ||
| callbacks_.onRequestMessage(std::move(req)); | ||
| callbacks_->onRequestMessage(std::move(req)); | ||
| } else { | ||
| ResponseMessageImplPtr res = | ||
| std::make_unique<ResponseMessageImpl>(request_id, body_size, is_event); | ||
| res->fromBuffer(buffer); | ||
| callbacks_.onResponseMessage(std::move(res)); | ||
| callbacks_->onResponseMessage(std::move(res)); | ||
| } | ||
|
|
||
| buffer.drain(MessageSize); | ||
| return true; | ||
| } | ||
|
|
||
| bool DubboProtocolImpl::decode(Buffer::Instance& buffer, Protocol::Context* context, | ||
| MessageMetadataSharedPtr metadata) { | ||
| if (!metadata) { | ||
| throw EnvoyException("invalid metadata parameter"); | ||
| } | ||
|
|
||
| if (buffer.length() < DubboProtocolImpl::MessageSize) { | ||
| return false; | ||
| } | ||
|
|
||
| uint16_t magic_number = buffer.peekBEInt<uint16_t>(); | ||
| if (magic_number != MagicNumber) { | ||
| throw EnvoyException(fmt::format("invalid dubbo message magic number {}", magic_number)); | ||
| } | ||
|
|
||
| uint8_t flag = buffer.peekInt<uint8_t>(FlagOffset); | ||
| MessageType type = | ||
| (flag & MessageTypeMask) == MessageTypeMask ? MessageType::Request : MessageType::Response; | ||
| bool is_event = (flag & EventMask) == EventMask ? true : false; | ||
| int64_t request_id = buffer.peekBEInt<int64_t>(RequestIDOffset); | ||
| int32_t body_size = buffer.peekBEInt<int32_t>(BodySizeOffset); | ||
|
|
||
| if (body_size > MaxBodySize || body_size <= 0) { | ||
| throw EnvoyException(fmt::format("invalid dubbo message size {}", body_size)); | ||
| } | ||
|
|
||
| metadata->setMessageType(type); | ||
| metadata->setRequestId(request_id); | ||
|
|
||
| if (type == MessageType::Request) { | ||
| parseRequestInfoFromBuffer(buffer, metadata); | ||
| } else { | ||
| parseResponseInfoFromBuffer(buffer, metadata); | ||
| } | ||
|
|
||
| context->header_size_ = DubboProtocolImpl::MessageSize; | ||
| context->body_size_ = body_size; | ||
| context->is_heartbeat_ = is_event; | ||
|
||
|
|
||
| return true; | ||
| } | ||
|
|
||
| bool DubboProtocolImpl::encode(Buffer::Instance& buffer, int32_t body_size, | ||
| const MessageMetadata& metadata) { | ||
| switch (metadata.message_type()) { | ||
| case MessageType::Response: { | ||
| ASSERT(metadata.response_status().has_value()); | ||
| buffer.writeBEInt<uint16_t>(MagicNumber); | ||
| buffer.writeByte(static_cast<uint8_t>(metadata.serialization_type())); | ||
| buffer.writeByte(static_cast<uint8_t>(metadata.response_status().value())); | ||
| buffer.writeBEInt<uint64_t>(metadata.request_id()); | ||
| buffer.writeBEInt<uint32_t>(body_size); | ||
| return true; | ||
| } | ||
| case MessageType::Request: { | ||
| NOT_IMPLEMENTED_GCOVR_EXCL_LINE; | ||
| } | ||
| default: | ||
| NOT_REACHED_GCOVR_EXCL_LINE; | ||
| } | ||
| } | ||
|
|
||
| class DubboProtocolConfigFactory : public ProtocolFactoryBase<DubboProtocolImpl> { | ||
| public: | ||
| DubboProtocolConfigFactory() : ProtocolFactoryBase(ProtocolType::Dubbo) {} | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.