|
31 | 31 | import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; |
32 | 32 | import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; |
33 | 33 | import com.google.common.base.Function; |
| 34 | +import com.google.common.collect.ContiguousSet; |
34 | 35 | import com.google.common.collect.ImmutableList; |
| 36 | +import com.google.common.collect.ImmutableSet; |
35 | 37 | import com.google.common.collect.Iterables; |
36 | 38 | import com.google.common.util.concurrent.SettableFuture; |
37 | 39 | import io.grpc.Server; |
|
40 | 42 | import java.util.ArrayList; |
41 | 43 | import java.util.Arrays; |
42 | 44 | import java.util.Comparator; |
| 45 | +import java.util.LinkedList; |
43 | 46 | import java.util.List; |
44 | 47 | import java.util.concurrent.BlockingQueue; |
45 | 48 | import java.util.concurrent.CountDownLatch; |
@@ -282,9 +285,9 @@ public void readOnlyTransaction() throws Exception { |
282 | 285 | Statement.of("SELECT * FROM TestTable WHERE Key IN ('k10', 'k11', 'k12')"); |
283 | 286 | Statement statement2 = Statement.of("SELECT * FROM TestTable WHERE Key IN ('k1', 'k2', 'k3"); |
284 | 287 | mockSpanner.putStatementResult( |
285 | | - StatementResult.query(statement1, generateKeyValueResultSet(10, 12))); |
| 288 | + StatementResult.query(statement1, generateKeyValueResultSet(ContiguousSet.closed(10, 12)))); |
286 | 289 | mockSpanner.putStatementResult( |
287 | | - StatementResult.query(statement2, generateKeyValueResultSet(1, 3))); |
| 290 | + StatementResult.query(statement2, generateKeyValueResultSet(ContiguousSet.closed(1, 3)))); |
288 | 291 |
|
289 | 292 | ApiFuture<ImmutableList<String>> values1; |
290 | 293 | ApiFuture<ImmutableList<String>> values2; |
@@ -333,4 +336,140 @@ public int compare(String o1, String o2) { |
333 | 336 | executor); |
334 | 337 | assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12"); |
335 | 338 | } |
| 339 | + |
| 340 | + @Test |
| 341 | + public void pauseResume() throws Exception { |
| 342 | + Statement unevenStatement = |
| 343 | + Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 1"); |
| 344 | + Statement evenStatement = |
| 345 | + Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0"); |
| 346 | + mockSpanner.putStatementResult( |
| 347 | + StatementResult.query( |
| 348 | + unevenStatement, generateKeyValueResultSet(ImmutableSet.of(1, 3, 5, 7, 9)))); |
| 349 | + mockSpanner.putStatementResult( |
| 350 | + StatementResult.query( |
| 351 | + evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10)))); |
| 352 | + |
| 353 | + final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create(); |
| 354 | + final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create(); |
| 355 | + final List<String> allValues = new LinkedList<>(); |
| 356 | + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { |
| 357 | + try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement); |
| 358 | + AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) { |
| 359 | + evenRs.setCallback( |
| 360 | + executor, |
| 361 | + new ReadyCallback() { |
| 362 | + private boolean firstRow = true; |
| 363 | + |
| 364 | + @Override |
| 365 | + public CallbackResponse cursorReady(AsyncResultSet resultSet) { |
| 366 | + if (firstRow) { |
| 367 | + // Make sure the uneven result set returns the first result. |
| 368 | + firstRow = false; |
| 369 | + return CallbackResponse.PAUSE; |
| 370 | + } |
| 371 | + try { |
| 372 | + while (true) { |
| 373 | + switch (resultSet.tryNext()) { |
| 374 | + case DONE: |
| 375 | + evenFinished.set(true); |
| 376 | + return CallbackResponse.DONE; |
| 377 | + case NOT_READY: |
| 378 | + return CallbackResponse.CONTINUE; |
| 379 | + case OK: |
| 380 | + allValues.add(resultSet.getString("Value")); |
| 381 | + return CallbackResponse.PAUSE; |
| 382 | + } |
| 383 | + } |
| 384 | + } catch (Throwable t) { |
| 385 | + evenFinished.setException(t); |
| 386 | + return CallbackResponse.DONE; |
| 387 | + } finally { |
| 388 | + unevenRs.resume(); |
| 389 | + } |
| 390 | + } |
| 391 | + }); |
| 392 | + |
| 393 | + unevenRs.setCallback( |
| 394 | + executor, |
| 395 | + new ReadyCallback() { |
| 396 | + @Override |
| 397 | + public CallbackResponse cursorReady(AsyncResultSet resultSet) { |
| 398 | + try { |
| 399 | + while (true) { |
| 400 | + switch (resultSet.tryNext()) { |
| 401 | + case DONE: |
| 402 | + unevenFinished.set(true); |
| 403 | + return CallbackResponse.DONE; |
| 404 | + case NOT_READY: |
| 405 | + return CallbackResponse.CONTINUE; |
| 406 | + case OK: |
| 407 | + allValues.add(resultSet.getString("Value")); |
| 408 | + return CallbackResponse.PAUSE; |
| 409 | + } |
| 410 | + } |
| 411 | + } catch (Throwable t) { |
| 412 | + unevenFinished.setException(t); |
| 413 | + return CallbackResponse.DONE; |
| 414 | + } finally { |
| 415 | + evenRs.resume(); |
| 416 | + } |
| 417 | + } |
| 418 | + }); |
| 419 | + } |
| 420 | + } |
| 421 | + assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get()) |
| 422 | + .containsExactly(Boolean.TRUE, Boolean.TRUE); |
| 423 | + assertThat(allValues) |
| 424 | + .containsExactly("v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10"); |
| 425 | + } |
| 426 | + |
| 427 | + @Test |
| 428 | + public void cancel() throws Exception { |
| 429 | + final List<String> values = new LinkedList<>(); |
| 430 | + final SettableApiFuture<Boolean> finished = SettableApiFuture.create(); |
| 431 | + final CountDownLatch receivedFirstRow = new CountDownLatch(1); |
| 432 | + final CountDownLatch cancelled = new CountDownLatch(1); |
| 433 | + try (AsyncResultSet rs = |
| 434 | + client.singleUse().readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES)) { |
| 435 | + rs.setCallback( |
| 436 | + executor, |
| 437 | + new ReadyCallback() { |
| 438 | + @Override |
| 439 | + public CallbackResponse cursorReady(AsyncResultSet resultSet) { |
| 440 | + try { |
| 441 | + while (true) { |
| 442 | + switch (resultSet.tryNext()) { |
| 443 | + case DONE: |
| 444 | + finished.set(true); |
| 445 | + return CallbackResponse.DONE; |
| 446 | + case NOT_READY: |
| 447 | + return CallbackResponse.CONTINUE; |
| 448 | + case OK: |
| 449 | + values.add(resultSet.getString("Value")); |
| 450 | + receivedFirstRow.countDown(); |
| 451 | + cancelled.await(); |
| 452 | + break; |
| 453 | + } |
| 454 | + } |
| 455 | + } catch (Throwable t) { |
| 456 | + finished.setException(t); |
| 457 | + return CallbackResponse.DONE; |
| 458 | + } |
| 459 | + } |
| 460 | + }); |
| 461 | + receivedFirstRow.await(); |
| 462 | + rs.cancel(); |
| 463 | + } |
| 464 | + cancelled.countDown(); |
| 465 | + try { |
| 466 | + finished.get(); |
| 467 | + fail("missing expected exception"); |
| 468 | + } catch (ExecutionException e) { |
| 469 | + assertThat(e.getCause()).isInstanceOf(SpannerException.class); |
| 470 | + SpannerException se = (SpannerException) e.getCause(); |
| 471 | + assertThat(se.getErrorCode()).isEqualTo(ErrorCode.CANCELLED); |
| 472 | + assertThat(values).containsExactly("v1"); |
| 473 | + } |
| 474 | + } |
336 | 475 | } |
0 commit comments