diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 692a60e97b5..7219389e775 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -306,12 +306,23 @@ private void createTxnAsync(final SettableApiFuture res) { void commit() { try { - commitResponse = commitAsync().get(); - } catch (InterruptedException e) { + // Normally, Gax will take care of any timeouts, but we add a timeout for getting the value + // from the future here as well to make sure the call always finishes, even if the future + // never resolves. + commitResponse = + commitAsync() + .get( + rpc.getCommitRetrySettings().getTotalTimeout().getSeconds() + 5, + TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { if (commitFuture != null) { commitFuture.cancel(true); } - throw SpannerExceptionFactory.propagateInterrupt(e); + if (e instanceof InterruptedException) { + throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) e); + } else { + throw SpannerExceptionFactory.propagateTimeout((TimeoutException) e); + } } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()); } @@ -422,6 +433,14 @@ public void run() { commitFuture.addListener( () -> { try (IScope ignore = tracer.withSpan(opSpan)) { + if (!commitFuture.isDone()) { + // This should not be possible, considering that we are in a listener for the + // future, but we add a result here as well as a safety precaution. + res.setException( + SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "commitFuture is not done")); + return; + } com.google.spanner.v1.CommitResponse proto = commitFuture.get(); if (!proto.hasCommitTimestamp()) { throw newSpannerException( @@ -430,20 +449,28 @@ public void run() { span.addAnnotation("Commit Done"); opSpan.end(); res.set(new CommitResponse(proto)); - } catch (Throwable e) { - if (e instanceof ExecutionException) { - e = - SpannerExceptionFactory.newSpannerException( - e.getCause() == null ? e : e.getCause()); - } else if (e instanceof InterruptedException) { - e = SpannerExceptionFactory.propagateInterrupt((InterruptedException) e); - } else { - e = SpannerExceptionFactory.newSpannerException(e); + } catch (Throwable throwable) { + SpannerException resultException; + try { + if (throwable instanceof ExecutionException) { + resultException = + SpannerExceptionFactory.asSpannerException( + throwable.getCause() == null ? throwable : throwable.getCause()); + } else if (throwable instanceof InterruptedException) { + resultException = + SpannerExceptionFactory.propagateInterrupt( + (InterruptedException) throwable); + } else { + resultException = SpannerExceptionFactory.asSpannerException(throwable); + } + span.addAnnotation("Commit Failed", resultException); + opSpan.setStatus(resultException); + opSpan.end(); + res.setException(onError(resultException, false)); + } catch (Throwable unexpectedError) { + // This is a safety precaution to make sure that a result is always returned. + res.setException(unexpectedError); } - span.addAnnotation("Commit Failed", e); - opSpan.setStatus(e); - opSpan.end(); - res.setException(onError((SpannerException) e, false)); } }, MoreExecutors.directExecutor()); @@ -451,9 +478,6 @@ public void run() { res.setException(SpannerExceptionFactory.propagateInterrupt(e)); } catch (TimeoutException e) { res.setException(SpannerExceptionFactory.propagateTimeout(e)); - } catch (ExecutionException e) { - res.setException( - SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause())); } catch (Throwable e) { res.setException( SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause())); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index b6016f04f78..2548e1fdc86 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -240,6 +240,7 @@ public class GapicSpannerRpc implements SpannerRpc { private final Set executeQueryRetryableCodes; private final RetrySettings readRetrySettings; private final Set readRetryableCodes; + private final RetrySettings commitRetrySettings; private final SpannerStub partitionedDmlStub; private final RetrySettings partitionedDmlRetrySettings; private final InstanceAdminStubSettings instanceAdminStubSettings; @@ -398,6 +399,8 @@ public GapicSpannerRpc(final SpannerOptions options) { options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings(); this.executeQueryRetryableCodes = options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes(); + this.commitRetrySettings = + options.getSpannerStubSettings().commitSettings().getRetrySettings(); partitionedDmlRetrySettings = options .getSpannerStubSettings() @@ -508,6 +511,8 @@ public UnaryCallable createUnaryCalla this.readRetryableCodes = null; this.executeQueryRetrySettings = null; this.executeQueryRetryableCodes = null; + this.commitRetrySettings = + SpannerStubSettings.newBuilder().commitSettings().getRetrySettings(); this.partitionedDmlStub = null; this.databaseAdminStubSettings = null; this.instanceAdminStubSettings = null; @@ -1801,6 +1806,11 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map rollbackAsync(RollbackRequest request, @Nullable Map options) { GrpcCallContext context = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index f063a7a3138..f07a28fb918 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -469,6 +469,10 @@ CommitResponse commit(CommitRequest commitRequest, @Nullable Map opti ApiFuture commitAsync( CommitRequest commitRequest, @Nullable Map options); + default RetrySettings getCommitRetrySettings() { + return SpannerStubSettings.newBuilder().commitSettings().getRetrySettings(); + } + void rollback(RollbackRequest request, @Nullable Map options) throws SpannerException; ApiFuture rollbackAsync(RollbackRequest request, @Nullable Map options); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 72befe8a2b4..2a850514d0d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -138,6 +138,8 @@ public void setUp() { when(rpc.getExecuteQueryRetryableCodes()) .thenReturn( SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); session = spanner.getSessionClient(db).createSession(); Span oTspan = mock(Span.class); ISpan span = new OpenTelemetrySpan(oTspan); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java index c1da423760d..561bfb89008 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java @@ -28,6 +28,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.rpc.Code; @@ -80,6 +81,8 @@ public void setup() { when(tracer.spanBuilderWithExplicitParent( eq(SpannerImpl.BATCH_UPDATE), eq(span), any(Attributes.class))) .thenReturn(span); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); } private TransactionContextImpl createContext() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index dc28b333c4f..c3fcf1c7480 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -35,6 +35,7 @@ import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.spanner.v1.BeginTransactionRequest; @@ -248,6 +249,8 @@ public void usesPreparedTransaction() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); DatabaseId db = DatabaseId.of("test", "test", "test"); try (SpannerImpl spanner = new SpannerImpl(rpc, options)) { DatabaseClient client = spanner.getDatabaseClient(db); @@ -332,6 +335,8 @@ public void inlineBegin() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); DatabaseId db = DatabaseId.of("test", "test", "test"); try (SpannerImpl spanner = new SpannerImpl(rpc, options)) { DatabaseClient client = spanner.getDatabaseClient(db); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 6a707a490dc..f5d9f1841a2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -35,6 +35,7 @@ import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; @@ -141,6 +142,8 @@ public void setUp() { CommitResponse.newBuilder() .setCommitTimestamp(Timestamp.getDefaultInstance()) .build())); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); when(rpc.rollbackAsync(Mockito.any(RollbackRequest.class), Mockito.anyMap())) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); Span oTspan = mock(Span.class); @@ -196,6 +199,8 @@ public void usesPreparedTransaction() { .setCommitTimestamp( Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000)) .build())); + when(rpc.getCommitRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); DatabaseId db = DatabaseId.of("test", "test", "test"); try (SpannerImpl spanner = new SpannerImpl(rpc, options)) { DatabaseClient client = spanner.getDatabaseClient(db);