From e066959278a031e437e6cf289f9bff3fbd853fe2 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 6 May 2022 18:10:21 +0900 Subject: [PATCH] Add base connector test to verify concurrent INSERT --- .../BaseDeltaLakeMinioConnectorTest.java | 14 ++++ .../plugin/kafka/TestKafkaConnectorTest.java | 6 ++ .../kudu/AbstractKuduConnectorTest.java | 7 ++ .../io/trino/testing/BaseConnectorTest.java | 69 +++++++++++++++++++ 4 files changed, 96 insertions(+) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 33ff1cf385d2..34a2b0bcbe44 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -121,6 +121,20 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e) "|Target file .* was created during locking"); } + @Override + protected void verifyConcurrentInsertFailurePermissible(Exception e) + { + assertThat(e) + .hasMessage("Failed to write Delta Lake transaction log entry") + .getCause() + .hasMessageMatching( + "Transaction log locked.*" + + "|.*/_delta_log/\\d+.json already exists" + + "|Conflicting concurrent writes found..*" + + "|Multiple live locks found for:.*" + + "|Target file .* was created during locking"); + } + @Override protected Optional filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup) { diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java index e5e9d49a1032..c3de140a2788 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java @@ -452,6 +452,12 @@ public void testInsertHighestUnicodeCharacter() .containsExactlyInAnyOrder("Hello", "hello测试􏿿world编码"); } + @Override + public void testInsertRowConcurrently() + { + throw new SkipException("TODO Prepare a topic in Kafka and enable this test"); + } + private static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List fields) { return new KafkaTopicDescription( diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java index fee3f800c0c8..d769c3aa2585 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java @@ -339,6 +339,13 @@ public void testInsertNegativeDate() throw new SkipException("TODO"); } + @Override + public void testInsertRowConcurrently() + { + // TODO Support these test once kudu connector can create tables with default partitions + throw new SkipException("TODO"); + } + @Test @Override public void testDelete() diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index ec18a1fbe108..7f49f790b080 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -43,6 +43,7 @@ import java.util.Deque; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.CountDownLatch; @@ -2778,6 +2779,74 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e) throw new AssertionError("Unexpected concurrent update failure", e); } + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @Test(timeOut = 60_000, invocationCount = 4) + public void testInsertRowConcurrently() + throws Exception + { + if (!hasBehavior(SUPPORTS_INSERT)) { + // Covered by testInsert + return; + } + + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert", "(col integer)")) { + String tableName = table.getName(); + + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + try { + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (" + threadNumber + ")"); + return OptionalInt.of(threadNumber); + } + catch (Exception e) { + RuntimeException trinoException = getTrinoExceptionCause(e); + try { + verifyConcurrentInsertFailurePermissible(trinoException); + } + catch (Throwable verifyFailure) { + if (trinoException != e && verifyFailure != e) { + verifyFailure.addSuppressed(e); + } + throw verifyFailure; + } + return OptionalInt.empty(); + } + })) + .collect(toImmutableList()); + + List values = futures.stream() + .map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out"))) + .filter(OptionalInt::isPresent) + .map(OptionalInt::getAsInt) + .collect(toImmutableList()); + + if (values.isEmpty()) { + assertQueryReturnsEmptyResult("TABLE " + tableName); + } + else { + // Cast to integer because some connectors (e.g. Oracle) map integer to different types that skippingTypesCheck can't resolve the mismatch. + assertThat(query("SELECT CAST(col AS INTEGER) FROM " + tableName)) + .matches(values.stream() + .map(value -> format("(%s)", value)) + .collect(joining(",", "VALUES ", ""))); + } + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + protected void verifyConcurrentInsertFailurePermissible(Exception e) + { + // By default, do not expect INSERT to fail in case of concurrent inserts + throw new AssertionError("Unexpected concurrent insert failure", e); + } + @Test public void testDropTable() {