6969import io .opencensus .trace .Status ;
7070import io .opencensus .trace .Tracer ;
7171import io .opencensus .trace .Tracing ;
72+ import java .util .Arrays ;
7273import java .util .HashSet ;
7374import java .util .Iterator ;
7475import java .util .LinkedList ;
@@ -848,7 +849,7 @@ private void keepAlive() {
848849 }
849850 }
850851
851- private void markUsed () {
852+ void markUsed () {
852853 lastUseTime = clock .instant ();
853854 }
854855
@@ -929,24 +930,30 @@ private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) {
929930 }
930931 }
931932
932- // Background task to maintain the pool. It closes idle sessions, keeps alive sessions that have
933- // not been used for a user configured time and creates session if needed to bring pool up to
934- // minimum required sessions. We keep track of the number of concurrent sessions being used.
935- // The maximum value of that over a window (10 minutes) tells us how many sessions we need in the
936- // pool. We close the remaining sessions. To prevent bursty traffic, we smear this out over the
937- // window length. We also smear out the keep alive traffic over the keep alive period.
933+ /**
934+ * Background task to maintain the pool. Tasks:
935+ *
936+ * <ul>
937+ * <li>Removes idle sessions from the pool. Sessions that go above MinSessions that have not
938+ * been used for the last 55 minutes will be removed from the pool. These will automatically
939+ * be garbage collected by the backend.
940+ * <li>Keeps alive sessions that have not been used for a user configured time in order to keep
941+ * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
942+ * over a window of 10 minutes to avoid bursty traffic.
943+ * </ul>
944+ */
938945 final class PoolMaintainer {
939946 // Length of the window in millis over which we keep track of maximum number of concurrent
940947 // sessions in use.
941948 private final Duration windowLength = Duration .ofMillis (TimeUnit .MINUTES .toMillis (10 ));
942949 // Frequency of the timer loop.
943- @ VisibleForTesting static final long LOOP_FREQUENCY = 10 * 1000L ;
950+ @ VisibleForTesting final long loopFrequency = options . getLoopFrequency () ;
944951 // Number of loop iterations in which we need to to close all the sessions waiting for closure.
945- @ VisibleForTesting final long numClosureCycles = windowLength .toMillis () / LOOP_FREQUENCY ;
952+ @ VisibleForTesting final long numClosureCycles = windowLength .toMillis () / loopFrequency ;
946953 private final Duration keepAliveMilis =
947954 Duration .ofMillis (TimeUnit .MINUTES .toMillis (options .getKeepAliveIntervalMinutes ()));
948955 // Number of loop iterations in which we need to keep alive all the sessions
949- @ VisibleForTesting final long numKeepAliveCycles = keepAliveMilis .toMillis () / LOOP_FREQUENCY ;
956+ @ VisibleForTesting final long numKeepAliveCycles = keepAliveMilis .toMillis () / loopFrequency ;
950957
951958 Instant lastResetTime = Instant .ofEpochMilli (0 );
952959 int numSessionsToClose = 0 ;
@@ -969,8 +976,8 @@ public void run() {
969976 maintainPool ();
970977 }
971978 },
972- LOOP_FREQUENCY ,
973- LOOP_FREQUENCY ,
979+ loopFrequency ,
980+ loopFrequency ,
974981 TimeUnit .MILLISECONDS );
975982 }
976983 }
@@ -993,7 +1000,7 @@ void maintainPool() {
9931000 running = true ;
9941001 }
9951002 Instant currTime = clock .instant ();
996- closeIdleSessions (currTime );
1003+ removeIdleSessions (currTime );
9971004 // Now go over all the remaining sessions and see if they need to be kept alive explicitly.
9981005 keepAliveSessions (currTime );
9991006 replenishPool ();
@@ -1005,46 +1012,43 @@ void maintainPool() {
10051012 }
10061013 }
10071014
1008- private void closeIdleSessions (Instant currTime ) {
1009- LinkedList <PooledSession > sessionsToClose = new LinkedList <>();
1015+ private void removeIdleSessions (Instant currTime ) {
10101016 synchronized (lock ) {
1011- // Every ten minutes figure out how many sessions need to be closed then close them over
1012- // next ten minutes.
1013- if (currTime .isAfter (lastResetTime .plus (windowLength ))) {
1014- int sessionsToKeep =
1015- Math .max (options .getMinSessions (), maxSessionsInUse + options .getMaxIdleSessions ());
1016- numSessionsToClose = totalSessions () - sessionsToKeep ;
1017- sessionsToClosePerLoop = (int ) Math .ceil ((double ) numSessionsToClose / numClosureCycles );
1018- maxSessionsInUse = 0 ;
1019- lastResetTime = currTime ;
1020- }
1021- if (numSessionsToClose > 0 ) {
1022- while (sessionsToClose .size () < Math .min (numSessionsToClose , sessionsToClosePerLoop )) {
1023- PooledSession sess =
1024- readSessions .size () > 0 ? readSessions .poll () : writePreparedSessions .poll ();
1025- if (sess != null ) {
1026- if (sess .state != SessionState .CLOSING ) {
1027- sess .markClosing ();
1028- sessionsToClose .add (sess );
1017+ // Determine the minimum last use time for a session to be deemed to still be alive. Remove
1018+ // all sessions that have a lastUseTime before that time, unless it would cause us to go
1019+ // below MinSessions. Prefer to remove read sessions above write-prepared sessions.
1020+ Instant minLastUseTime = currTime .minus (options .getRemoveInactiveSessionAfter ());
1021+ for (Iterator <PooledSession > iterator :
1022+ Arrays .asList (
1023+ readSessions .descendingIterator (), writePreparedSessions .descendingIterator ())) {
1024+ while (iterator .hasNext ()) {
1025+ PooledSession session = iterator .next ();
1026+ if (session .lastUseTime .isBefore (minLastUseTime )) {
1027+ if (session .state != SessionState .CLOSING ) {
1028+ removeFromPool (session );
1029+ iterator .remove ();
10291030 }
1030- } else {
1031- break ;
10321031 }
10331032 }
1034- numSessionsToClose -= sessionsToClose .size ();
10351033 }
10361034 }
1037- for (PooledSession sess : sessionsToClose ) {
1038- logger .log (Level .FINE , "Closing session {0}" , sess .getName ());
1039- closeSessionAsync (sess );
1040- }
10411035 }
10421036
10431037 private void keepAliveSessions (Instant currTime ) {
10441038 long numSessionsToKeepAlive = 0 ;
10451039 synchronized (lock ) {
1040+ if (numSessionsInUse >= (options .getMinSessions () + options .getMaxIdleSessions ())) {
1041+ // At least MinSessions are in use, so we don't have to ping any sessions.
1042+ return ;
1043+ }
10461044 // In each cycle only keep alive a subset of sessions to prevent burst of traffic.
1047- numSessionsToKeepAlive = (long ) Math .ceil ((double ) totalSessions () / numKeepAliveCycles );
1045+ numSessionsToKeepAlive =
1046+ (long )
1047+ Math .ceil (
1048+ (double )
1049+ ((options .getMinSessions () + options .getMaxIdleSessions ())
1050+ - numSessionsInUse )
1051+ / numKeepAliveCycles );
10481052 }
10491053 // Now go over all the remaining sessions and see if they need to be kept alive explicitly.
10501054 Instant keepAliveThreshold = currTime .minus (keepAliveMilis );
@@ -1053,9 +1057,11 @@ private void keepAliveSessions(Instant currTime) {
10531057 while (numSessionsToKeepAlive > 0 ) {
10541058 PooledSession sessionToKeepAlive = null ;
10551059 synchronized (lock ) {
1056- sessionToKeepAlive = findSessionToKeepAlive (readSessions , keepAliveThreshold );
1060+ sessionToKeepAlive = findSessionToKeepAlive (readSessions , keepAliveThreshold , 0 );
10571061 if (sessionToKeepAlive == null ) {
1058- sessionToKeepAlive = findSessionToKeepAlive (writePreparedSessions , keepAliveThreshold );
1062+ sessionToKeepAlive =
1063+ findSessionToKeepAlive (
1064+ writePreparedSessions , keepAliveThreshold , readSessions .size ());
10591065 }
10601066 }
10611067 if (sessionToKeepAlive == null ) {
@@ -1137,13 +1143,18 @@ private static enum Position {
11371143 @ GuardedBy ("lock" )
11381144 private long numSessionsReleased = 0 ;
11391145
1146+ @ GuardedBy ("lock" )
1147+ private long numIdleSessionsRemoved = 0 ;
1148+
11401149 private AtomicLong numWaiterTimeouts = new AtomicLong ();
11411150
11421151 @ GuardedBy ("lock" )
11431152 private final Set <PooledSession > allSessions = new HashSet <>();
11441153
11451154 private final SessionConsumer sessionConsumer = new SessionConsumerImpl ();
11461155
1156+ @ VisibleForTesting Function <PooledSession , Void > idleSessionRemovedListener ;
1157+
11471158 /**
11481159 * Create a session pool with the given options and for the given database. It will also start
11491160 * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.
@@ -1232,6 +1243,28 @@ private SessionPool(
12321243 this .initMetricsCollection (metricRegistry , labelValues );
12331244 }
12341245
1246+ @ VisibleForTesting
1247+ void removeFromPool (PooledSession session ) {
1248+ synchronized (lock ) {
1249+ if (isClosed ()) {
1250+ decrementPendingClosures (1 );
1251+ return ;
1252+ }
1253+ session .markClosing ();
1254+ allSessions .remove (session );
1255+ numIdleSessionsRemoved ++;
1256+ if (idleSessionRemovedListener != null ) {
1257+ idleSessionRemovedListener .apply (session );
1258+ }
1259+ }
1260+ }
1261+
1262+ long numIdleSessionsRemoved () {
1263+ synchronized (lock ) {
1264+ return numIdleSessionsRemoved ;
1265+ }
1266+ }
1267+
12351268 @ VisibleForTesting
12361269 int getNumberOfAvailableWritePreparedSessions () {
12371270 synchronized (lock ) {
@@ -1313,14 +1346,18 @@ private void invalidateSession(PooledSession session) {
13131346 }
13141347
13151348 private PooledSession findSessionToKeepAlive (
1316- Queue <PooledSession > queue , Instant keepAliveThreshold ) {
1349+ Queue <PooledSession > queue , Instant keepAliveThreshold , int numAlreadyChecked ) {
1350+ int numChecked = 0 ;
13171351 Iterator <PooledSession > iterator = queue .iterator ();
1318- while (iterator .hasNext ()) {
1352+ while (iterator .hasNext ()
1353+ && (numChecked + numAlreadyChecked )
1354+ < (options .getMinSessions () + options .getMaxIdleSessions () - numSessionsInUse )) {
13191355 PooledSession session = iterator .next ();
13201356 if (session .lastUseTime .isBefore (keepAliveThreshold )) {
13211357 iterator .remove ();
13221358 return session ;
13231359 }
1360+ numChecked ++;
13241361 }
13251362 return null ;
13261363 }
0 commit comments