Skip to content

Commit

Permalink
fix: make sure commitAsync always finishes (#3216)
Browse files Browse the repository at this point in the history
The future that is returned by commitAsync() seems to never finish
in some cases. It is unknown exactly what causes it, and this change
adds a number of safety precautions that should ensure that the
future always returns a result eventually. This is always easier
to debug and handle than a future that never returns a value.
  • Loading branch information
olavloite authored Jul 23, 2024
1 parent e859b29 commit 440c88b
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,23 @@ private void createTxnAsync(final SettableApiFuture<Void> res) {

void commit() {
try {
commitResponse = commitAsync().get();
} catch (InterruptedException e) {
// Normally, Gax will take care of any timeouts, but we add a timeout for getting the value
// from the future here as well to make sure the call always finishes, even if the future
// never resolves.
commitResponse =
commitAsync()
.get(
rpc.getCommitRetrySettings().getTotalTimeout().getSeconds() + 5,
TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
if (commitFuture != null) {
commitFuture.cancel(true);
}
throw SpannerExceptionFactory.propagateInterrupt(e);
if (e instanceof InterruptedException) {
throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
} else {
throw SpannerExceptionFactory.propagateTimeout((TimeoutException) e);
}
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
}
Expand Down Expand Up @@ -422,6 +433,14 @@ public void run() {
commitFuture.addListener(
() -> {
try (IScope ignore = tracer.withSpan(opSpan)) {
if (!commitFuture.isDone()) {
// This should not be possible, considering that we are in a listener for the
// future, but we add a result here as well as a safety precaution.
res.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "commitFuture is not done"));
return;
}
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
if (!proto.hasCommitTimestamp()) {
throw newSpannerException(
Expand All @@ -430,30 +449,35 @@ public void run() {
span.addAnnotation("Commit Done");
opSpan.end();
res.set(new CommitResponse(proto));
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e =
SpannerExceptionFactory.newSpannerException(
e.getCause() == null ? e : e.getCause());
} else if (e instanceof InterruptedException) {
e = SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
} else {
e = SpannerExceptionFactory.newSpannerException(e);
} catch (Throwable throwable) {
SpannerException resultException;
try {
if (throwable instanceof ExecutionException) {
resultException =
SpannerExceptionFactory.asSpannerException(
throwable.getCause() == null ? throwable : throwable.getCause());
} else if (throwable instanceof InterruptedException) {
resultException =
SpannerExceptionFactory.propagateInterrupt(
(InterruptedException) throwable);
} else {
resultException = SpannerExceptionFactory.asSpannerException(throwable);
}
span.addAnnotation("Commit Failed", resultException);
opSpan.setStatus(resultException);
opSpan.end();
res.setException(onError(resultException, false));
} catch (Throwable unexpectedError) {
// This is a safety precaution to make sure that a result is always returned.
res.setException(unexpectedError);
}
span.addAnnotation("Commit Failed", e);
opSpan.setStatus(e);
opSpan.end();
res.setException(onError((SpannerException) e, false));
}
},
MoreExecutors.directExecutor());
} catch (InterruptedException e) {
res.setException(SpannerExceptionFactory.propagateInterrupt(e));
} catch (TimeoutException e) {
res.setException(SpannerExceptionFactory.propagateTimeout(e));
} catch (ExecutionException e) {
res.setException(
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
} catch (Throwable e) {
res.setException(
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final Set<Code> executeQueryRetryableCodes;
private final RetrySettings readRetrySettings;
private final Set<Code> readRetryableCodes;
private final RetrySettings commitRetrySettings;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStubSettings instanceAdminStubSettings;
Expand Down Expand Up @@ -398,6 +399,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings();
this.executeQueryRetryableCodes =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes();
this.commitRetrySettings =
options.getSpannerStubSettings().commitSettings().getRetrySettings();
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
Expand Down Expand Up @@ -508,6 +511,8 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.readRetryableCodes = null;
this.executeQueryRetrySettings = null;
this.executeQueryRetryableCodes = null;
this.commitRetrySettings =
SpannerStubSettings.newBuilder().commitSettings().getRetrySettings();
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
this.instanceAdminStubSettings = null;
Expand Down Expand Up @@ -1801,6 +1806,11 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option,
return get(commitAsync(commitRequest, options));
}

@Override
public RetrySettings getCommitRetrySettings() {
return commitRetrySettings;
}

@Override
public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> opti
ApiFuture<CommitResponse> commitAsync(
CommitRequest commitRequest, @Nullable Map<Option, ?> options);

default RetrySettings getCommitRetrySettings() {
return SpannerStubSettings.newBuilder().commitSettings().getRetrySettings();
}

void rollback(RollbackRequest request, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public void setUp() {
when(rpc.getExecuteQueryRetryableCodes())
.thenReturn(
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes());
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
session = spanner.getSessionClient(db).createSession();
Span oTspan = mock(Span.class);
ISpan span = new OpenTelemetrySpan(oTspan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.rpc.Code;
Expand Down Expand Up @@ -80,6 +81,8 @@ public void setup() {
when(tracer.spanBuilderWithExplicitParent(
eq(SpannerImpl.BATCH_UPDATE), eq(span), any(Attributes.class)))
.thenReturn(span);
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
}

private TransactionContextImpl createContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -248,6 +249,8 @@ public void usesPreparedTransaction() {
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down Expand Up @@ -332,6 +335,8 @@ public void inlineBegin() {
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
Expand Down Expand Up @@ -141,6 +142,8 @@ public void setUp() {
CommitResponse.newBuilder()
.setCommitTimestamp(Timestamp.getDefaultInstance())
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
when(rpc.rollbackAsync(Mockito.any(RollbackRequest.class), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
Span oTspan = mock(Span.class);
Expand Down Expand Up @@ -196,6 +199,8 @@ public void usesPreparedTransaction() {
.setCommitTimestamp(
Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down

0 comments on commit 440c88b

Please sign in to comment.