Skip to content

Commit

Permalink
feat: server stream supports requests without parameters (#15026)
Browse files Browse the repository at this point in the history
* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* feat:  server stream supports requests without parameters

* feat:  server stream supports requests without parameters

* fix ut

* fix ut

* fix ut

* fix ut
  • Loading branch information
funky-eyes authored Dec 31, 2024
1 parent 0fd3706 commit 1da23ea
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.dubbo.common.utils.ReflectUtils;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -48,6 +50,8 @@ public class ReflectionMethodDescriptor implements MethodDescriptor {
private final Method method;
private final boolean generic;
private final RpcType rpcType;
private Class<?>[] actualRequestTypes;
private Class<?> actualResponseType;

public ReflectionMethodDescriptor(Method method) {
this.method = method;
Expand Down Expand Up @@ -82,13 +86,25 @@ private RpcType determineRpcType() {
if (parameterClasses.length > 2) {
return RpcType.UNARY;
}
Type[] genericParameterTypes = method.getGenericParameterTypes();
if (parameterClasses.length == 1 && isStreamType(parameterClasses[0]) && isStreamType(returnClass)) {
this.actualRequestTypes = new Class<?>[] {
obtainActualTypeInStreamObserver(
((ParameterizedType) method.getGenericReturnType()).getActualTypeArguments()[0])
};
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) genericParameterTypes[0]).getActualTypeArguments()[0]);
return RpcType.BI_STREAM;
}
if (parameterClasses.length == 2
boolean returnIsVoid = returnClass.getName().equals(void.class.getName());
if (returnIsVoid && parameterClasses.length == 1 && isStreamType(parameterClasses[0])) {
actualRequestTypes = Collections.emptyList().toArray(new Class<?>[0]);
return RpcType.SERVER_STREAM;
}
if (returnIsVoid
&& parameterClasses.length == 2
&& !isStreamType(parameterClasses[0])
&& isStreamType(parameterClasses[1])
&& returnClass.getName().equals(void.class.getName())) {
&& isStreamType(parameterClasses[1])) {
return RpcType.SERVER_STREAM;
}
if (Arrays.stream(parameterClasses).anyMatch(this::isStreamType) || isStreamType(returnClass)) {
Expand All @@ -103,6 +119,13 @@ private boolean isStreamType(Class<?> classType) {
return StreamObserver.class.isAssignableFrom(classType);
}

private static Class<?> obtainActualTypeInStreamObserver(Type typeInStreamObserver) {
return (Class<?>)
(typeInStreamObserver instanceof ParameterizedType
? ((ParameterizedType) typeInStreamObserver).getRawType()
: typeInStreamObserver);
}

@Override
public String getMethodName() {
return methodName;
Expand Down Expand Up @@ -156,6 +179,14 @@ public Object getAttribute(String key) {
return this.attributeMap.get(key);
}

public Class<?>[] getActualRequestTypes() {
return actualRequestTypes;
}

public Class<?> getActualResponseType() {
return actualResponseType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ public void onCompleted() {
System.out.println("Call sayHelloServerStream");
greeterService.sayHelloServerStream(buildRequest("triple"), responseObserver);

StreamObserver<HelloReply> sayHelloServerStreamNoParameterResponseObserver = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
System.out.println("sayHelloServerStreamNoParameter onNext: " + reply.getMessage());
}

@Override
public void onError(Throwable t) {
System.out.println("sayHelloServerStreamNoParameter onError: " + t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("sayHelloServerStreamNoParameter onCompleted");
}
};

greeterService.sayHelloServerStreamNoParameter(sayHelloServerStreamNoParameterResponseObserver);

StreamObserver<HelloReply> biResponseObserver = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface GreeterService {
*/
void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> responseObserver);

void sayHelloServerStreamNoParameter(StreamObserver<HelloReply> responseObserver);

/**
* Sends greetings with bi streaming
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply
responseObserver.onCompleted();
}

@Override
public void sayHelloServerStreamNoParameter(StreamObserver<HelloReply> responseObserver) {
LOGGER.info("Received sayHelloServerStreamNoParameter request");
for (int i = 1; i < 6; i++) {
LOGGER.info("sayHelloServerStreamNoParameter onNext: {} times", i);
responseObserver.onNext(toReply("Hello " + ' ' + i + " times"));
}
LOGGER.info("sayHelloServerStreamNoParameter onCompleted");
responseObserver.onCompleted();
}

@Override
public StreamObserver<HelloRequest> sayHelloBiStream(StreamObserver<HelloReply> responseObserver) {
LOGGER.info("Received sayHelloBiStream request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public class MethodMetadata {

private final Class<?>[] actualRequestTypes;
Expand Down Expand Up @@ -64,19 +61,9 @@ private static MethodMetadata doResolveReflection(ReflectionMethodDescriptor met
switch (method.getRpcType()) {
case CLIENT_STREAM:
case BI_STREAM:
actualRequestTypes = new Class<?>[] {
obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericReturnType()).getActualTypeArguments()[0])
};
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericParameterTypes()[0])
.getActualTypeArguments()[0]);
return new MethodMetadata(actualRequestTypes, actualResponseType);
case SERVER_STREAM:
actualRequestTypes = new Class[] {method.getMethod().getParameterTypes()[0]};
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericParameterTypes()[1])
.getActualTypeArguments()[0]);
actualRequestTypes = method.getActualRequestTypes();
actualResponseType = method.getActualResponseType();
return new MethodMetadata(actualRequestTypes, actualResponseType);
case UNARY:
actualRequestTypes = method.getParameterClasses();
Expand All @@ -85,11 +72,4 @@ private static MethodMetadata doResolveReflection(ReflectionMethodDescriptor met
}
throw new IllegalStateException("Can not reach here");
}

static Class<?> obtainActualTypeInStreamObserver(Type typeInStreamObserver) {
return (Class<?>)
(typeInStreamObserver instanceof ParameterizedType
? ((ParameterizedType) typeInStreamObserver).getRawType()
: typeInStreamObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public void onReturn(Object value) {}

@Override
public void onMessage(Object message) {
Class<?>[] params = invocation.getParameterTypes();
if (params.length == 1) {
if (params[0].isInstance(responseObserver)) {
invocation.setArguments(new Object[] {responseObserver});
return;
}
}
if (message instanceof Object[]) {
message = ((Object[]) message)[0];
}
Expand Down

0 comments on commit 1da23ea

Please sign in to comment.