Skip to content

Commit

Permalink
Add support for concurrent write on Iceberg transformed column
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks authored and ebyhr committed Nov 28, 2024
1 parent 9f8c860 commit 5b82e10
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2767,8 +2767,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
if (!dataColumnPredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate));
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertableToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);
if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
}
IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -768,6 +772,255 @@ void testConcurrentDeleteAndDeletePushdownAndInsert()
}
}

@Test
void testConcurrentUpdateWithPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-02-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_partition_transform_table_",
"(data varchar, part date) with (partitioning = array['month(part)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithNestedPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', ROW(DATE '2024-01-01'))", "('B', ROW(DATE '2024-02-02'))", "('C', ROW(DATE '2024-03-03'))", "('D', ROW(DATE '2024-04-04'))");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_partition_transform_table_",
"(data varchar, parent ROW (part date)) with (partitioning = array['month(\"parent.part\")'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE parent.part = %s", tableName, partitions.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, parent.part FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithMultiplePartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', TIMESTAMP '2024-01-01 01:01', 1, 'aaa')",
"('B', TIMESTAMP '2024-01-01 02:02', 1, 'aab')",
"('C', TIMESTAMP '2024-01-01 03:03', 1, 'aac')",
"('D', TIMESTAMP '2024-01-01 04:04', 1, 'aad')");
// Only hour partition is not-overlapping
List<String> partitions1 = ImmutableList.of("TIMESTAMP '2024-01-01 01:01'", "TIMESTAMP '2024-01-01 02:02'", "TIMESTAMP '2024-01-01 03:03'", "TIMESTAMP '2024-01-01 04:04'");
List<String> partitions2 = ImmutableList.of("1", "1", "1", "1");
List<String> partitions3 = ImmutableList.of("'aaa'", "'aab'", "'aac'", "'aad'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_multiple_partition_transform_table_",
"(data varchar, part1 timestamp, part2 int, part3 varchar) with (partitioning = array['hour(part1)', 'bucket(part2, 10)', 'truncate(part3, 2)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format(
"UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s AND part3 = %s",
tableName,
partitions1.get(threadNumber),
partitions2.get(threadNumber),
partitions3.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part1, part2, part3 FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', TIMESTAMP '2024-01-01 01:01', 1, 'aaa'), " +
"('BB', TIMESTAMP '2024-01-01 02:02', 1, 'aab')," +
" ('CC', TIMESTAMP '2024-01-01 03:03', 1, 'aac'), " +
"('DD', TIMESTAMP '2024-01-01 04:04', 1, 'aad')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithOverlappingPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-01-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-01-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_overlapping_partition_transform_table_",
"(data varchar, part date) with (partitioning = array['month(part)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber)));
return true;
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
assertThat(trinoException).hasMessageMatching("Failed to commit the transaction during write.*|" +
"Failed to commit during write.*");
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return false;
}
}))
.collect(toImmutableList());

long successfulWrites = futures.stream()
.map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(success -> success)
.count();

assertThat(successfulWrites).isEqualTo(3);

//There can be two possible results depended on which thread fails
MaterializedResult expected1 = computeActual("VALUES (VARCHAR 'AA', DATE '2024-01-01'), ('B', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
MaterializedResult expected2 = computeActual("VALUES (VARCHAR 'A', DATE '2024-01-01'), ('BB', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
assertThat(computeActual("SELECT data, part FROM " + tableName + " ORDER BY data"))
.isIn(expected1, expected2);
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithEnforcedAndUnenforcedPartitions()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', 'a', DATE '2024-01-01')", "('B', 'b', DATE '2024-02-02')", "('C', 'c', DATE '2024-03-03')", "('D', 'd', DATE '2024-04-04')");
List<String> partitions1 = ImmutableList.of("'a'", "'b'", "'c'", "'d'");
List<String> partitions2 = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_enforced_unenforced_partition_transform_table_",
// part1 is enforced and part2 is unenforced as it has transformation
"(data varchar, part1 varchar, part2 date) with (partitioning = array['part1', 'month(part2)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s", tableName, partitions1.get(threadNumber), partitions2.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part1, part2 FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', 'a', DATE '2024-01-01'), ('BB', 'b', DATE '2024-02-02'), ('CC', 'c', DATE '2024-03-03'), ('DD', 'd', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
Expand Down

0 comments on commit 5b82e10

Please sign in to comment.