|
98 | 98 | import static com.mongodb.assertions.Assertions.isTrue;
|
99 | 99 | import static com.mongodb.assertions.Assertions.notNull;
|
100 | 100 | import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
|
| 101 | +import static com.mongodb.internal.Locks.lockInterruptibly; |
101 | 102 | import static com.mongodb.internal.Locks.withLock;
|
| 103 | +import static com.mongodb.internal.Locks.withUnfairLock; |
102 | 104 | import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
|
103 | 105 | import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
|
104 | 106 | import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
|
105 |
| -import static com.mongodb.internal.Locks.lockInterruptibly; |
106 |
| -import static com.mongodb.internal.Locks.lockInterruptiblyUnfair; |
107 | 107 | import static com.mongodb.internal.connection.ConcurrentPool.sizeToString;
|
108 | 108 | import static com.mongodb.internal.event.EventListenerHelper.getConnectionPoolListener;
|
109 | 109 | import static com.mongodb.internal.logging.LogMessage.Component.CONNECTION;
|
@@ -1103,14 +1103,11 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
|
1103 | 1103 | }
|
1104 | 1104 |
|
1105 | 1105 | private void releasePermit() {
|
1106 |
| - lockInterruptiblyUnfair(lock); |
1107 |
| - try { |
| 1106 | + withUnfairLock(lock, () -> { |
1108 | 1107 | assertTrue(permits < maxPermits);
|
1109 | 1108 | permits++;
|
1110 | 1109 | permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
|
1111 |
| - } finally { |
1112 |
| - lock.unlock(); |
1113 |
| - } |
| 1110 | + }); |
1114 | 1111 | }
|
1115 | 1112 |
|
1116 | 1113 | private void expressDesireToGetAvailableConnection() {
|
@@ -1141,29 +1138,24 @@ private void giveUpOnTryingToGetAvailableConnection() {
|
1141 | 1138 | * from threads that are waiting for a permit to open a connection.
|
1142 | 1139 | */
|
1143 | 1140 | void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) {
|
1144 |
| - lockInterruptiblyUnfair(lock); |
1145 |
| - try { |
| 1141 | + boolean handedOver = withUnfairLock(lock, () -> { |
1146 | 1142 | for (//iterate from first (head) to last (tail)
|
1147 | 1143 | MutableReference<PooledConnection> desiredConnectionSlot : desiredConnectionSlots) {
|
1148 | 1144 | if (desiredConnectionSlot.reference == null) {
|
1149 | 1145 | desiredConnectionSlot.reference = new PooledConnection(openConnection);
|
1150 | 1146 | permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
|
1151 |
| - return; |
| 1147 | + return true; |
1152 | 1148 | }
|
1153 | 1149 | }
|
1154 |
| - } finally { |
1155 |
| - lock.unlock(); |
| 1150 | + return false; |
| 1151 | + }); |
| 1152 | + if (!handedOver) { |
| 1153 | + pool.release(openConnection); |
1156 | 1154 | }
|
1157 |
| - pool.release(openConnection); |
1158 | 1155 | }
|
1159 | 1156 |
|
1160 | 1157 | void signalClosedOrPaused() {
|
1161 |
| - lockInterruptiblyUnfair(lock); |
1162 |
| - try { |
1163 |
| - permitAvailableOrHandedOverOrClosedOrPausedCondition.signalAll(); |
1164 |
| - } finally { |
1165 |
| - lock.unlock(); |
1166 |
| - } |
| 1158 | + withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll); |
1167 | 1159 | }
|
1168 | 1160 |
|
1169 | 1161 | /**
|
@@ -1327,16 +1319,16 @@ private static class AsyncWorkManager implements AutoCloseable {
|
1327 | 1319 | }
|
1328 | 1320 |
|
1329 | 1321 | void enqueue(final Task task) {
|
1330 |
| - lockInterruptibly(lock); |
1331 |
| - try { |
| 1322 | + boolean closed = withLock(lock, () -> { |
1332 | 1323 | if (initUnlessClosed()) {
|
1333 | 1324 | tasks.add(task);
|
1334 |
| - return; |
| 1325 | + return false; |
1335 | 1326 | }
|
1336 |
| - } finally { |
1337 |
| - lock.unlock(); |
| 1327 | + return true; |
| 1328 | + }); |
| 1329 | + if (closed) { |
| 1330 | + task.failAsClosed(); |
1338 | 1331 | }
|
1339 |
| - task.failAsClosed(); |
1340 | 1332 | }
|
1341 | 1333 |
|
1342 | 1334 | /**
|
|
0 commit comments