diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 92ecbc3d55e..a5eee9d0db7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -574,6 +574,23 @@ public PooledSessionFuture replaceSession( } } + static class MultiplexedSessionReplacementHandler + implements SessionReplacementHandler { + @Override + public MultiplexedSessionFuture replaceSession( + SessionNotFoundException e, MultiplexedSessionFuture session) { + /** + * For multiplexed sessions, we would never obtain a {@link SessionNotFoundException}. Hence, + * this method will ideally never be invoked. + */ + logger.log( + Level.WARNING, + String.format( + "Replace session invoked for multiplexed session => %s", session.getName())); + throw e; + } + } + interface SessionNotFoundHandler { /** * Handles the given {@link SessionNotFoundException} by possibly converting it to a different @@ -1420,16 +1437,254 @@ PooledSession get(final boolean eligibleForLongRunning) { } } + class MultiplexedSessionFuture extends SimpleForwardingListenableFuture + implements SessionFuture { + private final ISpan span; + + @VisibleForTesting + MultiplexedSessionFuture(ListenableFuture delegate, ISpan span) { + super(delegate); + this.span = span; + } + + @Override + public Timestamp write(Iterable mutations) throws SpannerException { + return writeWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + try { + return get().writeWithOptions(mutations, options); + } finally { + close(); + } + } + + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + try { + return get().writeAtLeastOnceWithOptions(mutations, options); + } finally { + close(); + } + } + + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + try { + return get().batchWriteAtLeastOnce(mutationGroups, options); + } finally { + close(); + } + } + + @Override + public ReadContext singleUse() { + try { + return new AutoClosingReadContext<>( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUse(); + }, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + true); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public ReadContext singleUse(final TimestampBound bound) { + try { + return new AutoClosingReadContext<>( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUse(bound); + }, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + true); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction() { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(); + }, + true); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(bound); + }, + true); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction() { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().readOnlyTransaction(); + }, + false); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().readOnlyTransaction(bound); + }, + false); + } + + private ReadOnlyTransaction internalReadOnlyTransaction( + Function transactionSupplier, + boolean isSingleUse) { + try { + return new AutoClosingReadTransaction<>( + transactionSupplier, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + isSingleUse); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return new SessionPoolTransactionRunner<>( + this, multiplexedSessionReplacementHandler, options); + } + + @Override + public TransactionManager transactionManager(TransactionOption... options) { + return new AutoClosingTransactionManager<>( + this, multiplexedSessionReplacementHandler, options); + } + + @Override + public AsyncRunner runAsync(TransactionOption... options) { + return new SessionPoolAsyncRunner(this, multiplexedSessionReplacementHandler, options); + } + + @Override + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { + return new SessionPoolAsyncTransactionManager<>( + multiplexedSessionReplacementHandler, this, options); + } + + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + try { + return get().executePartitionedUpdate(stmt, options); + } finally { + close(); + } + } + + @Override + public String getName() { + return get().getName(); + } + + @Override + public void prepareReadWriteTransaction() { + get().prepareReadWriteTransaction(); + } + + @Override + public void close() { + try { + asyncClose().get(); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.asSpannerException(e.getCause()); + } + } + + @Override + public ApiFuture asyncClose() { + MultiplexedSession delegate = getOrNull(); + if (delegate != null) { + return delegate.asyncClose(); + } + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + + private MultiplexedSession getOrNull() { + try { + return get(); + } catch (Throwable ignore) { + // this exception will never be thrown for a multiplexed session since the Future + // object is already initialised. + return null; + } + } + + @Override + public MultiplexedSession get() { + MultiplexedSession res = null; + try { + res = super.get(); + } catch (Throwable e) { + // ignore the exception as it will be handled by the call to super.get() below. + } + if (res != null) { + res.markBusy(span); + } + try { + return super.get(); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.newSpannerException(e.getCause()); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } + } + } + interface CachedSession extends Session { SessionImpl getDelegate(); + // TODO This method can be removed once we fully migrate to multiplexed sessions. void markBusy(ISpan span); void markUsed(); SpannerException setLastException(SpannerException exception); + // TODO This method can be removed once we fully migrate to multiplexed sessions. boolean isAllowReplacing(); AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options); @@ -1730,6 +1985,175 @@ public TransactionManager transactionManager(TransactionOption... options) { } } + class MultiplexedSession implements CachedSession { + final SessionImpl delegate; + private volatile SpannerException lastException; + + MultiplexedSession(SessionImpl session) { + this.delegate = session; + } + + @Override + public boolean isAllowReplacing() { + // for multiplexed session there is only 1 session, hence there is nothing that we + // can replace. + return false; + } + + @Override + public void setAllowReplacing(boolean allowReplacing) { + // for multiplexed session there is only 1 session, there is nothing that can be replaced. + // hence this is no-op. + } + + @Override + public void markBusy(ISpan span) { + // no-op for a multiplexed session since a new span is already created and set in context + // for every handler invocation. + } + + @Override + public void markUsed() { + // no-op for a multiplexed session since we don't track the last-used time + // in case of multiplexed session + } + + @Override + public SpannerException setLastException(SpannerException exception) { + this.lastException = exception; + return exception; + } + + @Override + public SessionImpl getDelegate() { + return delegate; + } + + @Override + public Timestamp write(Iterable mutations) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public ReadContext singleUse() { + return delegate.singleUse(); + } + + @Override + public ReadContext singleUse(TimestampBound bound) { + return delegate.singleUse(bound); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction() { + return delegate.singleUseReadOnlyTransaction(); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { + return delegate.singleUseReadOnlyTransaction(bound); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction() { + return delegate.readOnlyTransaction(); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return delegate.readOnlyTransaction(bound); + } + + @Override + public TransactionRunner readWriteTransaction(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public TransactionManager transactionManager(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public AsyncRunner runAsync(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public void prepareReadWriteTransaction() { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public void close() { + synchronized (lock) { + numMultiplexedSessionsReleased++; + if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { + SessionPool.this.resourceNotFoundException = + MoreObjects.firstNonNull( + SessionPool.this.resourceNotFoundException, + (ResourceNotFoundException) lastException); + } + } + } + + @Override + public ApiFuture asyncClose() { + close(); + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + } + private final class WaiterFuture extends ForwardingListenableFuture { private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L; private final SettableFuture waiter = SettableFuture.create(); @@ -2162,9 +2586,15 @@ enum Position { @GuardedBy("lock") private long numSessionsAcquired = 0; + @GuardedBy("lock") + private long numMultiplexedSessionsAcquired = 0; + @GuardedBy("lock") private long numSessionsReleased = 0; + @GuardedBy("lock") + private long numMultiplexedSessionsReleased = 0; + @GuardedBy("lock") private long numIdleSessionsRemoved = 0; @@ -2192,7 +2622,8 @@ enum Position { private final CountDownLatch waitOnMinSessionsLatch; private final SessionReplacementHandler pooledSessionReplacementHandler = new PooledSessionReplacementHandler(); - + private static final SessionReplacementHandler multiplexedSessionReplacementHandler = + new MultiplexedSessionReplacementHandler(); /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0.