Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
import com.tencent.trpc.proto.http.common.RpcServerContextWithHttp;
import com.tencent.trpc.proto.http.common.TrpcServletRequestWrapper;
import com.tencent.trpc.proto.http.common.TrpcServletResponseWrapper;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -65,34 +69,41 @@ public abstract class AbstractHttpExecutor {

protected void execute(HttpServletRequest request, HttpServletResponse response,
RpcMethodInfoAndInvoker methodInfoAndInvoker) {

AtomicBoolean responded = new AtomicBoolean(false);
try {

DefRequest rpcRequest = buildDefRequest(request, response, methodInfoAndInvoker);

CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

// use a thread pool for asynchronous processing
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, countDownLatch);
invokeRpcRequest(methodInfoAndInvoker.getInvoker(), rpcRequest, completionFuture, responded);

// If the request carries a timeout, use this timeout to wait for the request to be processed.
// If not carried, use the default timeout.
long requestTimeout = rpcRequest.getMeta().getTimeout();
if (requestTimeout <= 0) {
requestTimeout = methodInfoAndInvoker.getInvoker().getConfig().getRequestTimeout();
}
if (requestTimeout > 0 && !countDownLatch.await(requestTimeout, TimeUnit.MILLISECONDS)) {
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
"wait http request execute timeout");
if (requestTimeout > 0) {
try {
completionFuture.get(requestTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
if (responded.compareAndSet(false, true)) {
doErrorReply(request, response,
TRpcException.newFrameException(ErrorCode.TRPC_SERVER_TIMEOUT_ERR,
"wait http request execute timeout"));
}
}
} else {
countDownLatch.await();
completionFuture.get();
}

} catch (Exception ex) {
logger.error("dispatch request [{}] error", request, ex);
doErrorReply(request, response, ex);
if (responded.compareAndSet(false, true)) {
doErrorReply(request, response, ex);
}
}

}

/**
Expand All @@ -107,55 +118,83 @@ protected void execute(HttpServletRequest request, HttpServletResponse response,
/**
* Request processing
*
* @param countDownLatch latch used to wait for the request processing
* @param invoker the invoker
* @param rpcRequest the rpc request
* @param completionFuture the completion future
* @param responded the responded flag
*/
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest, CountDownLatch countDownLatch) {
private void invokeRpcRequest(ProviderInvoker<?> invoker, DefRequest rpcRequest,
CompletableFuture<Void> completionFuture,
AtomicBoolean responded) {

WorkerPool workerPool = invoker.getConfig().getWorkerPoolObj();

if (null == workerPool) {
logger.error("dispatch rpcRequest [{}] error, workerPool is empty", rpcRequest);
throw TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
"not found service, workerPool is empty");
completionFuture.completeExceptionally(TRpcException.newFrameException(ErrorCode.TRPC_SERVER_NOSERVICE_ERR,
"not found service, workerPool is empty"));
return;
}

workerPool.execute(() -> {

// Get the original http response
HttpServletResponse response = getOriginalResponse(rpcRequest);

// Invoke the routing implementation method to handle the request.
CompletionStage<Response> future = invoker.invoke(rpcRequest);
future.whenComplete((result, t) -> {
try {
// Throw the call exception, which will be handled uniformly by the exception handling program.
if (t != null) {
throw t;
}

// Throw a business logic exception, which will be handled uniformly
// by the exception handling program.
Throwable ex = result.getException();
if (ex != null) {
throw ex;
try {
// Get the original http response
HttpServletResponse response = getOriginalResponse(rpcRequest);
// Invoke the routing implementation method to handle the request.
CompletionStage<Response> rpcFuture = invoker.invoke(rpcRequest);

rpcFuture.whenComplete((result, throwable) -> {
try {
if (responded.get()) {
return;
}

// Throw the call exception, which will be handled uniformly by the exception handling program.
if (throwable != null) {
throw throwable;
}

// Throw a business logic exception, which will be handled uniformly
// by the exception handling program.
if (result.getException() != null) {
throw result.getException();
}

// normal response
if (responded.compareAndSet(false, true)) {
response.setStatus(HttpStatus.SC_OK);
httpCodec.writeHttpResponse(response, result);
response.flushBuffer();
}

completionFuture.complete(null);
} catch (Throwable t) {
handleError(t, rpcRequest, response, responded, completionFuture);
}
});

// normal response
response.setStatus(HttpStatus.SC_OK);
httpCodec.writeHttpResponse(response, result);
response.flushBuffer();
} catch (Throwable e) {
HttpServletRequest request = getOriginalRequest(rpcRequest);
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, e);
httpErrorReply(request, response,
ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, e));
} finally {
countDownLatch.countDown();
}
});
} catch (Exception e) {
handleError(e, rpcRequest, getOriginalResponse(rpcRequest), responded, completionFuture);
}
});
}

/**
* Handle error
*/
private void handleError(Throwable t, DefRequest rpcRequest, HttpServletResponse response,
AtomicBoolean responded, CompletableFuture<Void> completionFuture) {
try {
if (responded.compareAndSet(false, true)) {
HttpServletRequest request = getOriginalRequest(rpcRequest);
logger.warn("reply message error, channel: [{}], msg:[{}]", request.getRemoteAddr(), request, t);
httpErrorReply(request, response, ErrorResponse.create(request, HttpStatus.SC_SERVICE_UNAVAILABLE, t));
}
} finally {
completionFuture.completeExceptionally(t);
}
}

/**
* Build the context request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,32 @@


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

import com.tencent.trpc.core.rpc.RpcInvocation;
import com.tencent.trpc.core.rpc.common.RpcMethodInfo;
import com.tencent.trpc.core.rpc.def.DefRequest;
import com.tencent.trpc.proto.http.common.ErrorResponse;
import com.tencent.trpc.proto.http.common.HttpConstants;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
Expand All @@ -48,4 +64,120 @@
methodInfo);
assertEquals(rpcInvocation.getFunc(), "/trpc.demo.server/hello");
}

@Test
public void handleError_shouldCallHttpErrorReply_whenRespondedIsFalse() throws Exception {
// Arrange
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);

Check warning on line 72 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'response'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:72:9: warning: 变量'response'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
DefRequest rpcRequest = mock(DefRequest.class);
AtomicBoolean responded = new AtomicBoolean(false);

Check warning on line 74 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'responded'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:74:9: warning: 变量'responded'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

Check warning on line 75 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'completionFuture'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:75:9: warning: 变量'completionFuture'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
Throwable throwable = new RuntimeException("test error");

Check warning on line 76 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'throwable'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:76:9: warning: 变量'throwable'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)

AbstractHttpExecutor abstractHttpExecutor = mock(AbstractHttpExecutor.class);

Check warning on line 78 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'abstractHttpExecutor'声明及第一次使用距离4行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:78:9: warning: 变量'abstractHttpExecutor'声明及第一次使用距离4行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)

// Mock getOriginalRequest method
Map<String, Object> attachments = new HashMap<>();
attachments.put(HttpConstants.TRPC_ATTACH_SERVLET_REQUEST, request);
when(rpcRequest.getAttachments()).thenReturn(attachments);
when(request.getRemoteAddr()).thenReturn("127.0.0.1");

// Mock httpErrorReply method
PowerMockito.doNothing().when(abstractHttpExecutor, "httpErrorReply", any(HttpServletRequest.class),
any(HttpServletResponse.class), any(ErrorResponse.class));

// Call real method
when(abstractHttpExecutor, "handleError", throwable, rpcRequest, response, responded, completionFuture)
.thenCallRealMethod();

// Act
Whitebox.invokeMethod(abstractHttpExecutor, "handleError", throwable, rpcRequest, response,
responded, completionFuture);

// Assert
assertTrue("responded should be set to true", responded.get());
assertTrue("completionFuture should be completed exceptionally", completionFuture.isCompletedExceptionally());

// Verify httpErrorReply was called
PowerMockito.verifyPrivate(abstractHttpExecutor, times(1))
.invoke("httpErrorReply", eq(request), eq(response), any(ErrorResponse.class));
}

@Test
public void handleError_shouldNotCallHttpErrorReply_whenRespondedIsTrue() throws Exception {
// Arrange
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
DefRequest rpcRequest = mock(DefRequest.class);
AtomicBoolean responded = new AtomicBoolean(true); // Already responded
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
Throwable throwable = new RuntimeException("test error");

AbstractHttpExecutor abstractHttpExecutor = mock(AbstractHttpExecutor.class);

// Mock httpErrorReply method
PowerMockito.doNothing().when(abstractHttpExecutor, "httpErrorReply", any(HttpServletRequest.class),
any(HttpServletResponse.class), any(ErrorResponse.class));

// Call real method
when(abstractHttpExecutor, "handleError", throwable, rpcRequest, response, responded, completionFuture)
.thenCallRealMethod();

// Act
Whitebox.invokeMethod(abstractHttpExecutor, "handleError", throwable, rpcRequest, response,
responded, completionFuture);

// Assert
assertTrue("responded should remain true", responded.get());
assertTrue("completionFuture should be completed exceptionally", completionFuture.isCompletedExceptionally());

// Verify httpErrorReply was NOT called
PowerMockito.verifyPrivate(abstractHttpExecutor, never())
.invoke("httpErrorReply", any(HttpServletRequest.class), any(HttpServletResponse.class),
any(ErrorResponse.class));
}

@Test
public void handleError_shouldCompleteExceptionally_evenWhenHttpErrorReplyThrows() throws Exception {
// Arrange
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);

Check warning on line 145 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'response'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:145:9: warning: 变量'response'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
DefRequest rpcRequest = mock(DefRequest.class);
AtomicBoolean responded = new AtomicBoolean(false);

Check warning on line 147 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'responded'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:147:9: warning: 变量'responded'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

Check warning on line 148 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'completionFuture'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:148:9: warning: 变量'completionFuture'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)
Throwable throwable = new RuntimeException("test error");

Check warning on line 149 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'throwable'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:149:9: warning: 变量'throwable'声明及第一次使用距离5行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)

AbstractHttpExecutor abstractHttpExecutor = mock(AbstractHttpExecutor.class);

Check warning on line 151 in trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java

View workflow job for this annotation

GitHub Actions / checkstyle

[checkstyle] reported by reviewdog 🐶 变量'abstractHttpExecutor'声明及第一次使用距离4行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 Raw Output: /github/workspace/./trpc-proto/trpc-proto-http/src/test/java/com/tencent/trpc/proto/http/server/AbstractHttpExecutorTest.java:151:9: warning: 变量'abstractHttpExecutor'声明及第一次使用距离4行(最多:3 行)。若需要存储该变量的值,请将其声明为final的(方法调用前声明以避免副作用影响原值)。 (com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck)

// Mock getOriginalRequest method
Map<String, Object> attachments = new HashMap<>();
attachments.put(HttpConstants.TRPC_ATTACH_SERVLET_REQUEST, request);
when(rpcRequest.getAttachments()).thenReturn(attachments);
when(request.getRemoteAddr()).thenReturn("127.0.0.1");

// Mock httpErrorReply method to throw exception
PowerMockito.doThrow(new IOException("response error")).when(abstractHttpExecutor, "httpErrorReply",
any(HttpServletRequest.class), any(HttpServletResponse.class), any(ErrorResponse.class));

// Call real method
when(abstractHttpExecutor, "handleError", throwable, rpcRequest, response, responded, completionFuture)
.thenCallRealMethod();

// Act
Whitebox.invokeMethod(abstractHttpExecutor, "handleError", throwable, rpcRequest, response,
responded, completionFuture);

// Assert
assertTrue("responded should be set to true", responded.get());
assertTrue("completionFuture should be completed exceptionally", completionFuture.isCompletedExceptionally());

try {
completionFuture.get();
fail("Should have thrown exception");
} catch (ExecutionException e) {
assertEquals("Should complete with original throwable", throwable, e.getCause());
}
}

}
Loading