Skip to content

Commit fb1fdc4

Browse files
committed
tests: fix flaky tests
1 parent 81f9ecc commit fb1fdc4

File tree

3 files changed

+71
-40
lines changed

3 files changed

+71
-40
lines changed

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,15 @@ public void asyncRunnerCommitAborted() throws Exception {
258258
new AsyncWork<Long>() {
259259
@Override
260260
public ApiFuture<Long> doWorkAsync(TransactionContext txn) {
261-
ApiFuture<Long> updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT);
262-
if (attempt.incrementAndGet() == 1) {
263-
mockSpanner.abortTransaction(txn);
264-
} else {
261+
if (attempt.get() > 0) {
265262
// Set the result of the update statement back to 1 row.
266263
mockSpanner.putStatementResult(
267264
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
268265
}
266+
ApiFuture<Long> updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT);
267+
if (attempt.incrementAndGet() == 1) {
268+
mockSpanner.abortTransaction(txn);
269+
}
269270
return updateCount;
270271
}
271272
},

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
4444
import java.util.Comparator;
45+
import java.util.Deque;
4546
import java.util.LinkedList;
4647
import java.util.List;
4748
import java.util.concurrent.BlockingQueue;
49+
import java.util.concurrent.ConcurrentLinkedDeque;
4850
import java.util.concurrent.CountDownLatch;
4951
import java.util.concurrent.ExecutionException;
5052
import java.util.concurrent.ExecutorService;
@@ -88,7 +90,7 @@ public static void setup() throws Exception {
8890
.build()
8991
.start();
9092
channelProvider = LocalChannelProvider.create(uniqueName);
91-
executor = Executors.newSingleThreadExecutor();
93+
executor = Executors.newScheduledThreadPool(8);
9294
}
9395

9496
@AfterClass
@@ -350,72 +352,85 @@ public void pauseResume() throws Exception {
350352
StatementResult.query(
351353
evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10))));
352354

355+
final Object lock = new Object();
353356
final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
354357
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
355-
final List<String> allValues = new LinkedList<>();
358+
final CountDownLatch unevenReturnedFirstRow = new CountDownLatch(1);
359+
final Deque<String> allValues = new ConcurrentLinkedDeque<>();
356360
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
357361
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
358362
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
359-
evenRs.setCallback(
363+
unevenRs.setCallback(
360364
executor,
361365
new ReadyCallback() {
362-
private boolean firstRow = true;
363-
364366
@Override
365367
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
366-
if (firstRow) {
367-
// Make sure the uneven result set returns the first result.
368-
firstRow = false;
369-
return CallbackResponse.PAUSE;
370-
}
371368
try {
372369
while (true) {
373370
switch (resultSet.tryNext()) {
374371
case DONE:
375-
evenFinished.set(true);
372+
unevenFinished.set(true);
376373
return CallbackResponse.DONE;
377374
case NOT_READY:
378375
return CallbackResponse.CONTINUE;
379376
case OK:
380-
allValues.add(resultSet.getString("Value"));
377+
synchronized (lock) {
378+
allValues.add(resultSet.getString("Value"));
379+
}
380+
unevenReturnedFirstRow.countDown();
381381
return CallbackResponse.PAUSE;
382382
}
383383
}
384384
} catch (Throwable t) {
385-
evenFinished.setException(t);
385+
unevenFinished.setException(t);
386386
return CallbackResponse.DONE;
387-
} finally {
388-
unevenRs.resume();
389387
}
390388
}
391389
});
392-
393-
unevenRs.setCallback(
390+
evenRs.setCallback(
394391
executor,
395392
new ReadyCallback() {
396393
@Override
397394
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
398395
try {
396+
// Make sure the uneven result set has returned the first before we start the even
397+
// results.
398+
unevenReturnedFirstRow.await();
399399
while (true) {
400400
switch (resultSet.tryNext()) {
401401
case DONE:
402-
unevenFinished.set(true);
402+
evenFinished.set(true);
403403
return CallbackResponse.DONE;
404404
case NOT_READY:
405405
return CallbackResponse.CONTINUE;
406406
case OK:
407-
allValues.add(resultSet.getString("Value"));
407+
synchronized (lock) {
408+
allValues.add(resultSet.getString("Value"));
409+
}
408410
return CallbackResponse.PAUSE;
409411
}
410412
}
411413
} catch (Throwable t) {
412-
unevenFinished.setException(t);
414+
evenFinished.setException(t);
413415
return CallbackResponse.DONE;
414-
} finally {
415-
evenRs.resume();
416416
}
417417
}
418418
});
419+
while (!(evenFinished.isDone() && unevenFinished.isDone())) {
420+
synchronized (lock) {
421+
if (allValues.peekLast() != null) {
422+
if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) {
423+
evenRs.resume();
424+
} else {
425+
unevenRs.resume();
426+
}
427+
}
428+
if (allValues.size() == 10) {
429+
unevenRs.resume();
430+
evenRs.resume();
431+
}
432+
}
433+
}
419434
}
420435
}
421436
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.ArrayList;
4949
import java.util.Arrays;
5050
import java.util.Comparator;
51+
import java.util.Deque;
5152
import java.util.LinkedList;
5253
import java.util.List;
5354
import java.util.concurrent.CountDownLatch;
@@ -343,9 +344,11 @@ public void pauseResume() throws Exception {
343344
Statement.of(
344345
"SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)");
345346

347+
final Object lock = new Object();
346348
final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
347349
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
348-
final List<String> allValues = new LinkedList<>();
350+
final CountDownLatch evenReturnedFirstRow = new CountDownLatch(1);
351+
final Deque<String> allValues = new LinkedList<>();
349352
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
350353
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
351354
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
@@ -363,32 +366,29 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
363366
case NOT_READY:
364367
return CallbackResponse.CONTINUE;
365368
case OK:
366-
allValues.add(resultSet.getString("StringValue"));
369+
synchronized (lock) {
370+
allValues.add(resultSet.getString("StringValue"));
371+
}
372+
evenReturnedFirstRow.countDown();
367373
return CallbackResponse.PAUSE;
368374
}
369375
}
370376
} catch (Throwable t) {
371377
evenFinished.setException(t);
372378
return CallbackResponse.DONE;
373-
} finally {
374-
unevenRs.resume();
375379
}
376380
}
377381
});
378382

379383
unevenRs.setCallback(
380384
executor,
381385
new ReadyCallback() {
382-
private boolean firstRow = true;
383-
384386
@Override
385387
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
386-
if (firstRow) {
387-
// Make sure the even result set returns the first result.
388-
firstRow = false;
389-
return CallbackResponse.PAUSE;
390-
}
391388
try {
389+
// Make sure the even result set has returned the first before we start the uneven
390+
// results.
391+
evenReturnedFirstRow.await();
392392
while (true) {
393393
switch (resultSet.tryNext()) {
394394
case DONE:
@@ -397,18 +397,33 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
397397
case NOT_READY:
398398
return CallbackResponse.CONTINUE;
399399
case OK:
400-
allValues.add(resultSet.getString("StringValue"));
400+
synchronized (lock) {
401+
allValues.add(resultSet.getString("StringValue"));
402+
}
401403
return CallbackResponse.PAUSE;
402404
}
403405
}
404406
} catch (Throwable t) {
405407
unevenFinished.setException(t);
406408
return CallbackResponse.DONE;
407-
} finally {
408-
evenRs.resume();
409409
}
410410
}
411411
});
412+
while (!(evenFinished.isDone() && unevenFinished.isDone())) {
413+
synchronized (lock) {
414+
if (allValues.peekLast() != null) {
415+
if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) {
416+
evenRs.resume();
417+
} else {
418+
unevenRs.resume();
419+
}
420+
}
421+
if (allValues.size() == 15) {
422+
unevenRs.resume();
423+
evenRs.resume();
424+
}
425+
}
426+
}
412427
}
413428
}
414429
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())

0 commit comments

Comments
 (0)