Skip to content

Commit

Permalink
Set callback decode (#7429)
Browse files Browse the repository at this point in the history
  • Loading branch information
guohao authored Mar 23, 2021
1 parent 5af4004 commit 631aedc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public Result invoke(Invocation inv) throws RpcException {
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
RpcContext.getContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));

waitForResultIfSync(asyncResult, invocation);
return asyncResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.Executor;

import static org.apache.dubbo.rpc.Constants.CONSUMER_MODEL;
import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;

public class ClientStream extends AbstractStream implements Stream {
private static final GrpcStatus MISSING_RESP = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
Expand All @@ -62,9 +62,11 @@ public class ClientStream extends AbstractStream implements Stream {
private final String authority;
private final Request request;
private final RpcInvocation invocation;
private final Executor callback;

public ClientStream(URL url, ChannelHandlerContext ctx, boolean needWrap, Request request) {
public ClientStream(URL url, ChannelHandlerContext ctx, boolean needWrap, Request request, Executor callback) {
super(url, ctx, needWrap);
this.callback = callback;
if (needWrap) {
setSerializeType((String) ((RpcInvocation) (request.getData())).getObjectAttachment(Constants.SERIALIZATION_KEY));
}
Expand Down Expand Up @@ -106,7 +108,7 @@ public void write(Object obj, ChannelPromise promise) throws IOException {
.method(HttpMethod.POST.asciiName())
.path("/" + invocation.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + invocation.getMethodName())
.set(HttpHeaderNames.CONTENT_TYPE, TripleConstant.CONTENT_PROTO)
.set(TripleConstant.TIMEOUT, invocation.get(CommonConstants.TIMEOUT_KEY) +"m")
.set(TripleConstant.TIMEOUT, invocation.get(CommonConstants.TIMEOUT_KEY) + "m")
.set(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);

final String version = invocation.getInvoker().getUrl().getVersion();
Expand Down Expand Up @@ -165,10 +167,10 @@ public void write(Object obj, ChannelPromise promise) throws IOException {
ClassLoadUtil.switchContextLoader(tccl);
}
final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(out, true);
streamChannel.write(data).addListener(f->{
if(f.isSuccess()){
streamChannel.write(data).addListener(f -> {
if (f.isSuccess()) {
promise.trySuccess();
}else{
} else {
promise.tryFailure(f.cause());
}
});
Expand All @@ -183,10 +185,7 @@ public void halfClose() {
onError(status);
return;
}
Http2Headers te = getTe();
if (te == null) {
te = getHeaders();
}
final Http2Headers te = getTe() == null ? getHeaders() : getTe();
final Integer code = te.getInt(TripleConstant.STATUS_KEY);
if (!GrpcStatus.Code.isOk(code)) {
final GrpcStatus status = GrpcStatus.fromCode(code)
Expand All @@ -196,38 +195,40 @@ public void halfClose() {
}
final InputStream data = getData();
if (data == null) {
responseErr(getCtx(), MISSING_RESP);
onError(MISSING_RESP);
return;
}
final Invocation invocation = (Invocation) (request.getData());
ServiceRepository repo = ApplicationModel.getServiceRepository();
MethodDescriptor methodDescriptor = repo.lookupMethod(invocation.getServiceName(), invocation.getMethodName());
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
final Object resp;
final ConsumerModel model = getConsumerModel(invocation);
if (model != null) {
ClassLoadUtil.switchContextLoader(model.getClassLoader());
}
if (isNeedWrap()) {
final TripleWrapper.TripleResponseWrapper message = TripleUtil.unpack(data, TripleWrapper.TripleResponseWrapper.class);
resp = TripleUtil.unwrapResp(getUrl(), message, getMultipleSerialization());
} else {
resp = TripleUtil.unpack(data, methodDescriptor.getReturnClass());
callback.execute(() -> {
final Invocation invocation = (Invocation) (request.getData());
ServiceRepository repo = ApplicationModel.getServiceRepository();
MethodDescriptor methodDescriptor = repo.lookupMethod(invocation.getServiceName(), invocation.getMethodName());
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
final Object resp;
final ConsumerModel model = getConsumerModel(invocation);
if (model != null) {
ClassLoadUtil.switchContextLoader(model.getClassLoader());
}
if (isNeedWrap()) {
final TripleWrapper.TripleResponseWrapper message = TripleUtil.unpack(data, TripleWrapper.TripleResponseWrapper.class);
resp = TripleUtil.unwrapResp(getUrl(), message, getMultipleSerialization());
} else {
resp = TripleUtil.unpack(data, methodDescriptor.getReturnClass());
}
Response response = new Response(request.getId(), request.getVersion());
final AppResponse result = new AppResponse(resp);
result.setObjectAttachments(parseHeadersToMap(te));
response.setResult(result);
DefaultFuture2.received(Connection.getConnectionFromChannel(getCtx().channel()), response);
} catch (Exception e) {
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(e)
.withDescription("Failed to deserialize response");
onError(status);
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
Response response = new Response(request.getId(), request.getVersion());
final AppResponse result = new AppResponse(resp);
result.setObjectAttachments(parseHeadersToMap(te));
response.setResult(result);
DefaultFuture2.received(Connection.getConnectionFromChannel(getCtx().channel()), response);
} catch (Exception e) {
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withCause(e)
.withDescription("Failed to deserialize response");
onError(status);
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.util.concurrent.Executor;

public class TripleClientHandler extends ChannelDuplexHandler {

Expand Down Expand Up @@ -58,7 +59,8 @@ private void writeRequest(ChannelHandlerContext ctx, final Request req, ChannelP
final RpcInvocation inv = (RpcInvocation) req.getData();
final boolean needWrapper = TripleUtil.needWrapper(inv.getParameterTypes());
final URL url = inv.getInvoker().getUrl();
ClientStream clientStream = new ClientStream(url, ctx, needWrapper, req);
final Executor callback = (Executor) inv.getAttributes().remove("callback.executor");
ClientStream clientStream = new ClientStream(url, ctx, needWrapper, req,callback);
clientStream.write(req, promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
FutureContext.getContext().setCompatibleFuture(respFuture);
AsyncRpcResult result = new AsyncRpcResult(respFuture, inv);
result.setExecutor(executor);
inv.put("callback.executor",executor );


if (!connection.isAvailable()) {
Expand Down

0 comments on commit 631aedc

Please sign in to comment.