Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e)
"|Target file .* was created during locking");
}

@Override
protected void verifyConcurrentInsertFailurePermissible(Exception e)
{
assertThat(e)
.hasMessage("Failed to write Delta Lake transaction log entry")
.getCause()
.hasMessageMatching(
"Transaction log locked.*" +
"|.*/_delta_log/\\d+.json already exists" +
"|Conflicting concurrent writes found..*" +
"|Multiple live locks found for:.*" +
"|Target file .* was created during locking");
}

@Override
protected Optional<DataMappingTestSetup> filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ public void testInsertHighestUnicodeCharacter()
.containsExactlyInAnyOrder("Hello", "hello测试􏿿world编码");
}

@Override
public void testInsertRowConcurrently()
{
throw new SkipException("TODO Prepare a topic in Kafka and enable this test");
}

private static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List<KafkaTopicFieldDescription> fields)
{
return new KafkaTopicDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ public void testInsertNegativeDate()
throw new SkipException("TODO");
}

@Override
public void testInsertRowConcurrently()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
}

@Test
@Override
public void testDelete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -2778,6 +2779,74 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e)
throw new AssertionError("Unexpected concurrent update failure", e);
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@Test(timeOut = 60_000, invocationCount = 4)
public void testInsertRowConcurrently()
throws Exception
{
if (!hasBehavior(SUPPORTS_INSERT)) {
// Covered by testInsert
return;
}

int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert", "(col integer)")) {
String tableName = table.getName();

List<Future<OptionalInt>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (" + threadNumber + ")");
return OptionalInt.of(threadNumber);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
verifyConcurrentInsertFailurePermissible(trinoException);
}
catch (Throwable verifyFailure) {
if (trinoException != e && verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return OptionalInt.empty();
}
}))
.collect(toImmutableList());

List<Integer> values = futures.stream()
.map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(OptionalInt::isPresent)
.map(OptionalInt::getAsInt)
.collect(toImmutableList());

if (values.isEmpty()) {
assertQueryReturnsEmptyResult("TABLE " + tableName);
}
else {
// Cast to integer because some connectors (e.g. Oracle) map integer to different types that skippingTypesCheck can't resolve the mismatch.
assertThat(query("SELECT CAST(col AS INTEGER) FROM " + tableName))
.matches(values.stream()
.map(value -> format("(%s)", value))
.collect(joining(",", "VALUES ", "")));
}
}
finally {
executor.shutdownNow();
executor.awaitTermination(10, SECONDS);
}
}

protected void verifyConcurrentInsertFailurePermissible(Exception e)
{
// By default, do not expect INSERT to fail in case of concurrent inserts
throw new AssertionError("Unexpected concurrent insert failure", e);
}

@Test
public void testDropTable()
{
Expand Down