Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ public TimeoutContext withComputedServerSelectionTimeoutContext() {
}

public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
if (hasTimeoutMS()) {
return assertNotNull(timeout);
}
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
Expand Down Expand Up @@ -193,7 +195,7 @@ public InternalConnection get(final OperationContext operationContext) {
Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
try {
stateAndGeneration.throwIfClosedOrPaused();
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart);
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart, operationContext);
if (!connection.opened()) {
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart);
}
Expand All @@ -208,7 +210,7 @@ public InternalConnection get(final OperationContext operationContext) {
@Override
public void getAsync(final OperationContext operationContext, final SingleResultCallback<InternalConnection> callback) {
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
Timeout maxWaitTimeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS);
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
SingleResultCallback<PooledConnection> eventSendingCallback = (connection, failure) -> {
SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
if (failure == null) {
Expand All @@ -225,13 +227,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult
eventSendingCallback.onResult(null, e);
return;
}
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, t -> {
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext, t -> {
if (t != null) {
eventSendingCallback.onResult(null, t);
} else {
PooledConnection connection;
try {
connection = getPooledConnection(maxWaitTimeout, checkoutStart);
connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext);
} catch (Exception e) {
eventSendingCallback.onResult(null, e);
return;
Expand Down Expand Up @@ -330,7 +332,9 @@ public int getGeneration() {
return stateAndGeneration.generation();
}

private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException {
private PooledConnection getPooledConnection(final Timeout waitQueueTimeout,
final StartTime startTime,
final OperationContext operationContext) throws MongoTimeoutException {
try {
UsageTrackingInternalConnection internalConnection = waitQueueTimeout.call(NANOSECONDS,
() -> pool.get(-1L, NANOSECONDS),
Expand All @@ -345,7 +349,7 @@ private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, fin
}
return new PooledConnection(internalConnection);
} catch (MongoTimeoutException e) {
throw createTimeoutException(startTime);
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
}
}

Expand All @@ -359,13 +363,14 @@ private PooledConnection getPooledConnectionImmediate() {
return internalConnection == null ? null : new PooledConnection(internalConnection);
}

private MongoTimeoutException createTimeoutException(final StartTime startTime) {
private MongoTimeoutException createTimeoutException(final StartTime startTime, final TimeoutContext timeoutContext) {
long elapsedMs = startTime.elapsed().toMillis();
int numPinnedToCursor = pinnedStatsManager.getNumPinnedToCursor();
int numPinnedToTransaction = pinnedStatsManager.getNumPinnedToTransaction();
if (numPinnedToCursor == 0 && numPinnedToTransaction == 0) {
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.",
elapsedMs, serverId.getAddress()));
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.",
elapsedMs, serverId.getAddress());
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
} else {
int maxSize = pool.getMaxSize();
int numInUse = pool.getInUseCount();
Expand Down Expand Up @@ -394,12 +399,13 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime)
int numOtherInUse = numInUse - numPinnedToCursor - numPinnedToTransaction;
assertTrue(numOtherInUse >= 0);
assertTrue(numPinnedToCursor + numPinnedToTransaction + numOtherInUse <= maxSize);
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s. Details: "
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: "
+ "maxPoolSize: %s, connections in use by cursors: %d, connections in use by transactions: %d, "
+ "connections in use by other operations: %d",
elapsedMs, serverId.getAddress(),
sizeToString(maxSize), numPinnedToCursor, numPinnedToTransaction,
numOtherInUse));
numOtherInUse);
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
}
}

Expand Down Expand Up @@ -965,7 +971,7 @@ private PooledConnection openWithConcurrencyLimit(final OperationContext operati
PooledConnection availableConnection;
try {//phase one
availableConnection = acquirePermitOrGetAvailableOpenedConnection(
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime);
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime, operationContext);
} catch (Exception e) {
connection.closeSilently();
throw e;
Expand Down Expand Up @@ -1007,7 +1013,7 @@ void openWithConcurrencyLimitAsync(
final SingleResultCallback<PooledConnection> callback) {
PooledConnection availableConnection;
try {//phase one
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime);
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext);
} catch (Exception e) {
connection.closeSilently();
callback.onResult(null, e);
Expand Down Expand Up @@ -1038,7 +1044,8 @@ void openWithConcurrencyLimitAsync(
*/
@Nullable
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable,
final Timeout waitQueueTimeout, final StartTime startTime)
final Timeout waitQueueTimeout, final StartTime startTime,
final OperationContext operationContext)
throws MongoTimeoutException, MongoInterruptedException {
PooledConnection availableConnection = null;
boolean expressedDesireToGetAvailableConnection = false;
Expand Down Expand Up @@ -1067,7 +1074,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
& (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) {

Timeout.onExistsAndExpired(waitQueueTimeout, () -> {
throw createTimeoutException(startTime);
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
});
waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
() -> "acquiring permit or getting available opened connection");
Expand Down Expand Up @@ -1389,10 +1396,15 @@ final class Task {
private final Timeout timeout;
private final StartTime startTime;
private final Consumer<RuntimeException> action;
private final OperationContext operationContext;
private boolean completed;

Task(final Timeout timeout, final StartTime startTime, final Consumer<RuntimeException> action) {
Task(final Timeout timeout,
final StartTime startTime,
final OperationContext operationContext,
final Consumer<RuntimeException> action) {
this.timeout = timeout;
this.operationContext = operationContext;
this.startTime = startTime;
this.action = action;
}
Expand All @@ -1406,7 +1418,7 @@ void failAsClosed() {
}

void failAsTimedOut() {
doComplete(() -> createTimeoutException(startTime));
doComplete(() -> createTimeoutException(startTime, operationContext.getTimeoutContext()));
}

private void doComplete(final Supplier<RuntimeException> failureSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.mongodb.internal.function.CheckedFunction;
import com.mongodb.internal.function.CheckedRunnable;
import com.mongodb.internal.function.CheckedSupplier;
import com.mongodb.lang.Nullable;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -40,6 +40,8 @@
/**
* A Timeout is a "deadline", point in time by which something must happen.
*
* Implementations of this interface must be immutable.
*
* @see TimePoint
*/
public interface Timeout {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@
import static com.mongodb.ClusterFixture.createOperationContext;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED;
import static java.lang.Long.MAX_VALUE;
import static java.lang.Thread.sleep;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -133,6 +135,33 @@ public void shouldThrowOnTimeout() throws InterruptedException {
assertTrue(connectionGetter.isGotTimeout());
}

@Test
public void shouldNotUseMaxAwaitTimeMSOnWhenTimeoutMsIsSet() throws InterruptedException {
// given
provider = new DefaultConnectionPool(SERVER_ID, connectionFactory,
ConnectionPoolSettings.builder()
.maxSize(1)
.build(),
mockSdamProvider(), OPERATION_CONTEXT_FACTORY);
provider.ready();
TimeoutSettings timeoutSettings = TIMEOUT_SETTINGS
.withTimeout(100L, MILLISECONDS)
.withMaxWaitTimeMS(50);

InternalConnection internalConnection = provider.get(createOperationContext(timeoutSettings));

// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
new Thread(connectionGetter).start();

sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
internalConnection.close();
connectionGetter.getLatch().await();

// then
assertFalse(connectionGetter.isGotTimeout());
}

@Test
public void shouldThrowOnPoolClosed() {
provider = new DefaultConnectionPool(SERVER_ID, connectionFactory,
Expand Down Expand Up @@ -166,7 +195,7 @@ public void shouldExpireConnectionAfterMaxLifeTime() throws InterruptedException

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(100);
sleep(100);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -187,7 +216,7 @@ public void shouldExpireConnectionAfterLifeTimeOnClose() throws InterruptedExcep

// when
InternalConnection connection = provider.get(OPERATION_CONTEXT);
Thread.sleep(50);
sleep(50);
connection.close();

// then
Expand All @@ -208,7 +237,7 @@ public void shouldExpireConnectionAfterMaxIdleTime() throws InterruptedException

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(100);
sleep(100);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -230,7 +259,7 @@ public void shouldCloseConnectionAfterExpiration() throws InterruptedException {

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(50);
sleep(50);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -252,7 +281,7 @@ public void shouldCreateNewConnectionAfterExpiration() throws InterruptedExcepti

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(50);
sleep(50);
provider.doMaintenance();
InternalConnection secondConnection = provider.get(OPERATION_CONTEXT);

Expand All @@ -277,7 +306,7 @@ public void shouldPruneAfterMaintenanceTaskRuns() throws InterruptedException {


// when
Thread.sleep(10);
sleep(10);
provider.doMaintenance();

// then
Expand Down Expand Up @@ -594,7 +623,7 @@ private static void useConcurrently(final DefaultConnectionPool pool, final int
*/
private static void sleepMillis(final long millis) {
try {
Thread.sleep(millis);
sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Loading