Skip to content

Commit

Permalink
Merge pull request wso2#2100 from malakaganga/add_warnLogs
Browse files Browse the repository at this point in the history
Improve logs to detect worker pool exhaustion
  • Loading branch information
malakaganga authored May 19, 2023
2 parents c0698d9 + c798aa1 commit cd21eba
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,24 @@ public void run() {
cleanup();
return;
}
// Mark the start of the request at the beginning of the worker thread
response.getConnection().getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
response.getConnection().getContext().getAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME);

Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long clientWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
if (clientWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Client worker thread queued time exceeds the expected max queueing time. Expected max "
+ "queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : "
+ clientWorkerQueuedTime + "ms"+ ", Correlation Id : "
+ requestMessageContext.getProperty(CorrelationConstants.CORRELATION_ID));
}

}

if (responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION) != null) {
((NHttpServerConnection) responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION)).
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ public class PassThroughConstants {
public static final String RES_TO_CLIENT_BODY_WRITE_START_TIME = "RES_TO_CLIENT_BODY_WRITE_START_TIME";

public static final String SERVER_WORKER_INIT_TIME = "SERVER_WORKER_INIT_TIME";

public static final String SERVER_WORKER_THREAD_STATUS = "SERVER_WORKER_THREAD_STATUS";

public static final String CLIENT_WORKER_THREAD_STATUS = "CLIENT_WORKER_THREAD_STATUS";

public static final String SERVER_WORKER_SIDE_QUEUED_TIME = "SERVER_WORKER_SIDE_QUEUED_TIME";

public static final String CLIENT_WORKER_SIDE_QUEUED_TIME = "CLIENT_WORKER_SIDE_QUEUED_TIME";

public static final String THREAD_STATUS_RUNNING = "RUNNING";

public static final String SERVER_WORKER_START_TIME = "SERVER_WORKER_START_TIME";

public static final String CLIENT_WORKER_INIT_TIME = "CLIENT_WORKER_INIT_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class ServerWorker implements Runnable {

/** Weather we should do rest dispatching or not */
private boolean isRestDispatching = true;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private OutputStream os; //only used for WSDL requests..

Expand Down Expand Up @@ -140,6 +142,23 @@ public ServerWorker(final SourceRequest request,

public void run() {
try {

// Mark the start of the request at the beginning of the worker thread
request.getConnection().getContext().setAttribute(PassThroughConstants.SERVER_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
request.getConnection().getContext().getAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME);

Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long serverWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
if (serverWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Server worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : " +
serverWorkerQueuedTime + "ms" + ", Correlation Id : " + correlationId);
}

}
/* Remove correlation id MDC thread local value that can be persisting from the
previous usage of this thread */
MDC.remove(CorrelationConstants.CORRELATION_MDC_PROPERTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.synapse.transport.passthru;

import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.*;
Expand Down Expand Up @@ -63,6 +64,8 @@
*/
public class SourceHandler implements NHttpServerEventHandler {
private static Log log = LogFactory.getLog(SourceHandler.class);
private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

/** logger for correlation.log */
private static final Log correlationLog = LogFactory.getLog(PassThroughConstants.CORRELATION_LOGGER);
private static final Log transportLatencyLog = LogFactory.getLog(PassThroughConstants.TRANSPORT_LATENCY_LOGGER);
Expand Down Expand Up @@ -164,10 +167,20 @@ public void requestReceived(NHttpServerConnection conn) {
OutputStream os = getOutputStream(method, request);
Object correlationId = conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID);
if (correlationId != null) {
sourceConfiguration.getWorkerPool().execute(new ServerWorker(request, sourceConfiguration, os,
WorkerPool workerPool = sourceConfiguration.getWorkerPool();
workerPool.execute(new ServerWorker(request, sourceConfiguration, os,
System.currentTimeMillis(), correlationId.toString()));
if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) {
conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME,
System.currentTimeMillis());
}
} else {
sourceConfiguration.getWorkerPool().execute(new ServerWorker(request, sourceConfiguration, os));
WorkerPool workerPool = sourceConfiguration.getWorkerPool();
workerPool.execute(new ServerWorker(request, sourceConfiguration, os));
if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) {
conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME,
System.currentTimeMillis());
}
}
//increasing the input request metric
metrics.requestReceived();
Expand Down Expand Up @@ -571,6 +584,12 @@ public void timeout(NHttpServerConnection conn) {
ProtocolState state = SourceContext.getState(conn);
Map<String, String> logDetails = getLoggingInfo(conn, state);

if (!PassThroughConstants.THREAD_STATUS_RUNNING.equals(conn.getContext().getAttribute
(PassThroughConstants.SERVER_WORKER_THREAD_STATUS))) {
log.warn("Source Handler Socket Timeout occurred while the worker pool exhausted, " +
"INTERNAL_STATE = " + state + ", CORRELATION_ID = "
+ conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID));
}
if (state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE) {
if (log.isDebugEnabled()) {
log.debug(conn + ": Keep-Alive connection was time out: ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.transport.base.MetricsCollector;
import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.ConnectionClosedException;
Expand Down Expand Up @@ -457,9 +458,13 @@ public void responseReceived(NHttpClientConnection conn) {
if (statusCode == HttpStatus.SC_ACCEPTED && handle202(requestMsgContext)) {
return;
}

targetConfiguration.getWorkerPool().execute(
WorkerPool workerPool = targetConfiguration.getWorkerPool();
workerPool.execute(
new ClientWorker(targetConfiguration, requestMsgContext, targetResponse));
if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) {
conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME,
System.currentTimeMillis());
}

targetConfiguration.getMetrics().incrementMessagesReceived();

Expand Down Expand Up @@ -772,6 +777,12 @@ public void timeout(NHttpClientConnection conn) {
if (log.isDebugEnabled()) {
log.debug(getErrorMessage("Connection timeout", conn) + " "+ getConnectionLoggingInfo(conn));
}
if (!PassThroughConstants.THREAD_STATUS_RUNNING.equals(conn.getContext().getAttribute
(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS))) {
log.warn("Target Handler Socket Timeout occurred while the worker pool exhausted, " +
"INTERNAL_STATE = " + state + ", CORRELATION_ID = "
+ conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID));
}

if (state != null &&
(state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public interface PassThroughConfigPNames {
* Defines the header name set for correlation logs
*/
public String CORRELATION_HEADER_NAME_PROPERTY = "correlation_header_name";

/**
* Defines max waiting time for a request to be queued for a worker thread
*/
public String EXPECTED_MAX_QUEUEING_TIME = "expected_max_queueing_time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class PassThroughConfiguration {
private static final int DEFAULT_MAX_ACTIVE_CON = -1;
private static final int DEFAULT_LISTENER_SHUTDOWN_WAIT_TIME = 0;
private static final int DEFAULT_CONNECTION_GRACE_TIME = 10000;

private static final String EXPECTED_MAX_QUEUEING_TIME_DEFAULT = "30000";
private Boolean isKeepAliveDisabled = null;

private Boolean isConsumeAndDiscard = true;
Expand Down Expand Up @@ -192,6 +194,20 @@ public String getCorrelationHeaderName() {
PassThroughConstants.CORRELATION_DEFAULT_HEADER);
}

public Long getExpectedMaxQueueingTime() {
String expectedMaxQueuingTime = getStringProperty(PassThroughConfigPNames.EXPECTED_MAX_QUEUEING_TIME,
EXPECTED_MAX_QUEUEING_TIME_DEFAULT);
Long convertedExpectedMaxQueuingTime;
try {
convertedExpectedMaxQueuingTime = Long.parseLong(expectedMaxQueuingTime);
} catch (NumberFormatException exception) {
log.warn("Invalid value for the expected max queuing time. Expected max queuing time should be a long value. " +
"Using the default value " + EXPECTED_MAX_QUEUEING_TIME_DEFAULT);
convertedExpectedMaxQueuingTime = Long.parseLong(EXPECTED_MAX_QUEUEING_TIME_DEFAULT);
}
return convertedExpectedMaxQueuingTime;
}

/**
* Loads the properties from a given property file path
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.apache.synapse.transport.passthru.SourceContext;

import java.io.IOException;
Expand Down Expand Up @@ -89,6 +90,8 @@ public void useConnection(NHttpServerConnection conn) {
public void releaseConnection(NHttpServerConnection conn) {
lock.lock();
try {
conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS);
conn.getContext().removeAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME);
SourceContext.get(conn).reset();

if (busyConnections.remove(conn)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public void releaseConnection(NHttpClientConnection conn) {
PassThroughConstants.CONNECTION_POOL);

TargetContext.get(conn).reset(false);
conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS);
conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME);
//Set the event mask to Read since connection is released to the pool and should be ready to read
conn.requestInput();

Expand Down

0 comments on commit cd21eba

Please sign in to comment.