dubbo_proxy: Implement the routing of Dubbo requests#5973
dubbo_proxy: Implement the routing of Dubbo requests#5973lizan merged 23 commits intoenvoyproxy:masterfrom
Conversation
There was a problem hiding this comment.
replace AppExceptionType to ResponseStatus?
Obviously, there is not much difference between the AppExceptionType and the ResponseStatus here except for the name.
There was a problem hiding this comment.
Sorry,it should use false here,we should not shut down downstream connection because of one upstream request error.
There was a problem hiding this comment.
The encoding for type has been moved to HessianUtils::writeInt.
There was a problem hiding this comment.
If the connection is closed remotely, how should the current ActiveMessage be destructed?
There was a problem hiding this comment.
If RemoteConnectionFailure mistakes before not yet received the response, will be called sendLocalReply, this function will close the downstream connection, will trigger a ConnectionManager: :onEvent function call, will eventually trigger ActiveMessage: :onReset function calls.
If you have received a response, it will call resetDownstreamConnection, will eventually trigger ActiveMessage: : onReset function calls.
There was a problem hiding this comment.
Where will close the Downstream connection? why do you close?
There was a problem hiding this comment.
I added a resetStream interface that notified downstream module to release the current stream when the upstream connection was closed,downstream connection will not be closed.
There was a problem hiding this comment.
resetStream may be called before a connection is established, such as when onDestroy calls resetStream after a connection fails.
There was a problem hiding this comment.
If conn_pool_handle_ is not nullptr, the connection has not been established, so conn_data_ must be nullptr, do not know if I understand correctly?
There was a problem hiding this comment.
This is true, but resetStream does not know when it is called, and the connection may not have been established or may have already been established, so you cannot use ASSERT to force check conn_data_ or conn_pool_handle_ in this function.
There was a problem hiding this comment.
Is conn_pool_handle_ and conn_data_ back nullptr at the same time?
There was a problem hiding this comment.
conn_pool_handle_ is the processing that the connection pool provides to the upper layer while the connection is being established, and is set to nullptr once the connection is established, the resetStream call may be made after the connection is established
There was a problem hiding this comment.
Why is this onResponseComplete called here, is the response not completed?
There was a problem hiding this comment.
Although upstreamData still needs more data, end_stream has been flagged for new data to arrive and the connection has been disconnected, so in the case of upstream_request_, the request has been completed, and both success and failure are forms of completion
|
/ wait |
There was a problem hiding this comment.
Where will close the Downstream connection? why do you close?
There was a problem hiding this comment.
replace const std::string& str to const absl::string_view& ?
There was a problem hiding this comment.
Why isn't onResponseComplete called here?
There was a problem hiding this comment.
Reset merely resets the upstream connection and does not indicate that the request has been completed.
|
/ wait |
There was a problem hiding this comment.
HessianUtils::writeString(output_buffer, content)?
There was a problem hiding this comment.
replace router_matcher to route_matcher?
There was a problem hiding this comment.
The router_matcher dependency has been removed and the router_matcher library name renamed to route_matcher.
There was a problem hiding this comment.
If the request is one_way, the current stream is released after the Filter ends the callback, and there is no need to call the resetStream function.
There was a problem hiding this comment.
Why is the end_stream equal to true here?
There was a problem hiding this comment.
Sorry, this is legacy code, I've deleted the end_stream parameter.
There was a problem hiding this comment.
What is the meaning of this end_stream?
There was a problem hiding this comment.
Sorry, this is legacy code, I've deleted the end_stream parameter.
|
/ wait |
There was a problem hiding this comment.
Where is the logic for heartbeat message processing?
There was a problem hiding this comment.
The heartbeat processing logic is in the Decoder and ConnectionManager, which is not included in this submission.
There was a problem hiding this comment.
Need to close the connection here?
There was a problem hiding this comment.
I change it to resetStream to end the current stream instead of closing the connection.
There was a problem hiding this comment.
How to continue to call the next filter callback?
There was a problem hiding this comment.
I've added the continue decoding logic.
|
/ wait |
There was a problem hiding this comment.
If the connection failure scenario comes here, you should not call the encodeData method.
There was a problem hiding this comment.
The transportEnd interface is not called if the connection fails. assuming that the encodeData function is called, there is a corresponding judgment here. (if (!conn_data_)).
There was a problem hiding this comment.
What is the case that conn_data_ will be nullptr?
There was a problem hiding this comment.
For example, before connection ready, the filter's callback interface transportEnd has been called, thus triggering encodeData function call. At this time, conn_data_ is nullptr.
There was a problem hiding this comment.
What is the purpose of buffered_request_body_?
There was a problem hiding this comment.
The goal is to cache the transfer data when the connection is not ready and the Filter's callback interface is complete.
|
/ wait |
|
Can improve the coverage of this module |
|
🔨 rebuilding |
|
/retest |
|
🔨 rebuilding |
|
/retest |
|
🔨 rebuilding |
This reverts commit d9eb2c1. Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
|
/retest |
|
🔨 rebuilding |
|
/retest |
|
🔨 rebuilding |
| // Serialized response content. | ||
| serialized_size += HessianUtils::writeString(output_buffer, content); | ||
|
|
||
| ASSERT(output_buffer.length() - origin_length == serialized_size); |
There was a problem hiding this comment.
(output_buffer.length() - origin_length)
|
|
||
| void Router::UpstreamRequest::onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| ENVOY_LOG(warn, "dubbo upstream request: connection failure '{}'", host->address()->asString()); |
There was a problem hiding this comment.
If PoolFailureReason is Overflow, then host is equal to nullptr
| if (reason == Tcp::ConnectionPool::PoolFailureReason::Timeout || | ||
| reason == Tcp::ConnectionPool::PoolFailureReason::LocalConnectionFailure || | ||
| reason == Tcp::ConnectionPool::PoolFailureReason::RemoteConnectionFailure) { | ||
| parent_.callbacks_->continueDecoding(); |
There was a problem hiding this comment.
Other PoolFailureReason do not require continueDecoding?
There was a problem hiding this comment.
Is not needed, if the reason for failure is equal to overflow, the newConnection call will return nullptr, UpstreamRequest: :start call will return to Continue.
There was a problem hiding this comment.
if (reason != Tcp::ConnectionPool::PoolFailureReason::Overflow) {
parent_.callbacks_->continueDecoding();
}
better?
There was a problem hiding this comment.
Haha, I wrote it like this at first, but later I changed it because I thought it would be easier to understand.
| parent_.callbacks_->sendLocalReply( | ||
| AppException(ResponseStatus::ServerError, | ||
| fmt::format("dubbo upstream request: too many connections to '{}'", | ||
| upstream_host_->address()->asString())), |
|
@lizan Looking great, let's take a look. |
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
| : EnvoyException(what), status_(status), | ||
| response_type_(RpcResponseType::ResponseWithException) {} | ||
|
|
||
| AppException::AppException(const AppException& ex) |
There was a problem hiding this comment.
Is this needed to be defined explicitly?
There was a problem hiding this comment.
Yes, i have modified it.
There was a problem hiding this comment.
I meant you can just use default, no?
https://en.cppreference.com/w/cpp/language/copy_constructor
There was a problem hiding this comment.
Sorry, i misunderstood, i have modified it to: AppException(const AppException& ex) = default.
| @@ -60,6 +51,8 @@ RpcResultPtr HessianDeserializerImpl::deserializeRpcResult(Buffer::Instance& buf | |||
| result = std::make_unique<RpcResultImpl>(true); | |||
| public: | ||
| DubboProtocolImpl(ProtocolCallbacks& callbacks) : callbacks_(callbacks) {} | ||
| DubboProtocolImpl() = default; | ||
| DubboProtocolImpl(ProtocolCallbacks* callbacks) : callbacks_(callbacks) {} |
| ASSERT(size == (str_max_length + values.size())); | ||
|
|
||
| size_t child_size = doWriteString( | ||
| instance, absl::string_view(str_view.data() + str_max_length, length - str_max_length)); |
| } | ||
|
|
||
| if (length < two_octet_max_lenth) { | ||
| uint8_t code = length / 256; // 0x30 + length / 0x100 must less than 0x34 |
|
|
||
| if (length < two_octet_max_lenth) { | ||
| uint8_t code = length / 256; // 0x30 + length / 0x100 must less than 0x34 | ||
| uint8_t remain = length % 256; |
| static std::chrono::milliseconds readDate(Buffer::Instance& buffer); | ||
| static std::string readByte(Buffer::Instance& buffer); | ||
|
|
||
| static size_t writeString(Buffer::Instance& buffer, const absl::string_view& str); |
| DubboFilters::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoTyped( | ||
| const envoy::config::filter::dubbo::router::v2alpha1::Router& proto_config, | ||
| const std::string& stat_prefix, Server::Configuration::FactoryContext& context) { | ||
| UNREFERENCED_PARAMETER(proto_config); |
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
|
@lizan Sorry the coverage forget didn't check the header file |
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
|
/retest |
|
🔨 rebuilding |
|
@zyfjeff I've improved. |
|
@lizan Is there any other code review opinion ? |
| : EnvoyException(what), status_(status), | ||
| response_type_(RpcResponseType::ResponseWithException) {} | ||
|
|
||
| AppException::AppException(const AppException& ex) |
There was a problem hiding this comment.
I meant you can just use default, no?
https://en.cppreference.com/w/cpp/language/copy_constructor
| } | ||
| } | ||
|
|
||
| size_t doWriteString(Buffer::Instance& instance, const absl::string_view& str_view) { |
There was a problem hiding this comment.
I have modified it to: size_t doWriteString(Buffer::Instance& instance, absl::string_view str_view).
| return result; | ||
| } | ||
|
|
||
| size_t HessianUtils::writeString(Buffer::Instance& buffer, const absl::string_view str) { |
There was a problem hiding this comment.
I have modified it to: size_t writeString(Buffer::Instance& buffer, absl::string_view str).
Signed-off-by: leilei.gll <leilei.gll@alibaba-inc.com>
* master: (59 commits) http fault: add response rate limit injection (envoyproxy#6267) xds: introduce initial_fetch_timeout option to limit initialization time (envoyproxy#6048) test: fix cpuset-threads tests (envoyproxy#6278) server: add an API for registering for notifications for server instance life… (envoyproxy#6254) remove remains of TestBase (envoyproxy#6286) dubbo_proxy: Implement the routing of Dubbo requests (envoyproxy#5973) Revert "stats: add new BoolIndicator stat type (envoyproxy#5813)" (envoyproxy#6280) runtime: codifying runtime guarded features (envoyproxy#6134) mysql_filter: fix integration test flakes (envoyproxy#6272) tls: update BoringSSL to debed9a4 (3683). (envoyproxy#6273) rewrite buffer implementation to eliminate evbuffer dependency (envoyproxy#5441) Remove the dependency from TimeSystem to libevent by using the Event::Scheduler abstraction as a delegate. (envoyproxy#6240) fuzz: fix use of literal in default initialization. (envoyproxy#6268) http: add HCM functionality required for rate limiting (envoyproxy#6242) Disable mysql_integration_test until it is deflaked. (envoyproxy#6250) test: use ipv6_only IPv6 addresses in custom cluster integration tests. (envoyproxy#6260) tracing: If parent span is propagated with empty string, it causes th… (envoyproxy#6263) upstream: fix oss-fuzz issue envoyproxy#11095. (envoyproxy#6220) Wire up panic mode subset to receive updates (envoyproxy#6221) docs: clarify xds docs with warming information (envoyproxy#6236) ...
Description: Implement the routing of Dubbo requests
Risk Level: low
Testing: unit test
Docs Changes: N/A
Release Notes: N/A
[Optional Fixes #Issue] #3998
[Optional Deprecated:]
this is only part of the code for DubboProxy, the rest of the code hasn't been committed yet, complete implementation: https://github.com/gengleilei/envoy/tree/feature-dubbo-router-develop/source/extensions/filters/network/dubbo_proxy