Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent write on Iceberg transformed column #24160

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2741,8 +2741,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
if (!dataColumnPredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate));
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertableToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);
if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty cool @pajaks 🎉

Outstanding test coverage.

}
IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -768,6 +772,255 @@ void testConcurrentDeleteAndDeletePushdownAndInsert()
}
}

@Test
void testConcurrentUpdateWithPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-02-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_partition_transform_table_",
"(data varchar, part date) with (partitioning = array['month(part)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithNestedPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', ROW(DATE '2024-01-01'))", "('B', ROW(DATE '2024-02-02'))", "('C', ROW(DATE '2024-03-03'))", "('D', ROW(DATE '2024-04-04'))");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_partition_transform_table_",
"(data varchar, parent ROW (part date)) with (partitioning = array['month(\"parent.part\")'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE parent.part = %s", tableName, partitions.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, parent.part FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithMultiplePartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', TIMESTAMP '2024-01-01 01:01', 1, 'aaa')",
"('B', TIMESTAMP '2024-01-01 02:02', 1, 'aab')",
"('C', TIMESTAMP '2024-01-01 03:03', 1, 'aac')",
"('D', TIMESTAMP '2024-01-01 04:04', 1, 'aad')");
// Only hour partition is not-overlapping
List<String> partitions1 = ImmutableList.of("TIMESTAMP '2024-01-01 01:01'", "TIMESTAMP '2024-01-01 02:02'", "TIMESTAMP '2024-01-01 03:03'", "TIMESTAMP '2024-01-01 04:04'");
List<String> partitions2 = ImmutableList.of("1", "1", "1", "1");
List<String> partitions3 = ImmutableList.of("'aaa'", "'aab'", "'aac'", "'aad'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_multiple_partition_transform_table_",
"(data varchar, part1 timestamp, part2 int, part3 varchar) with (partitioning = array['hour(part1)', 'bucket(part2, 10)', 'truncate(part3, 2)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format(
"UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s AND part3 = %s",
tableName,
partitions1.get(threadNumber),
partitions2.get(threadNumber),
partitions3.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part1, part2, part3 FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', TIMESTAMP '2024-01-01 01:01', 1, 'aaa'), " +
"('BB', TIMESTAMP '2024-01-01 02:02', 1, 'aab')," +
" ('CC', TIMESTAMP '2024-01-01 03:03', 1, 'aac'), " +
"('DD', TIMESTAMP '2024-01-01 04:04', 1, 'aad')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithOverlappingPartitionTransformation()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-01-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-01-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_overlapping_partition_transform_table_",
"(data varchar, part date) with (partitioning = array['month(part)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber)));
return true;
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
assertThat(trinoException).hasMessageMatching("Failed to commit the transaction during write.*|" +
"Failed to commit during write.*");
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return false;
}
}))
.collect(toImmutableList());

long successfulWrites = futures.stream()
.map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(success -> success)
.count();

assertThat(successfulWrites).isEqualTo(3);

//There can be two possible results depended on which thread fails
MaterializedResult expected1 = computeActual("VALUES (VARCHAR 'AA', DATE '2024-01-01'), ('B', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
MaterializedResult expected2 = computeActual("VALUES (VARCHAR 'A', DATE '2024-01-01'), ('BB', DATE '2024-01-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
assertThat(computeActual("SELECT data, part FROM " + tableName + " ORDER BY data"))
.isIn(expected1, expected2);
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
void testConcurrentUpdateWithEnforcedAndUnenforcedPartitions()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', 'a', DATE '2024-01-01')", "('B', 'b', DATE '2024-02-02')", "('C', 'c', DATE '2024-03-03')", "('D', 'd', DATE '2024-04-04')");
List<String> partitions1 = ImmutableList.of("'a'", "'b'", "'c'", "'d'");
List<String> partitions2 = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_update_enforced_unenforced_partition_transform_table_",
// part1 is enforced and part2 is unenforced as it has transformation
"(data varchar, part1 varchar, part2 date) with (partitioning = array['part1', 'month(part2)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part1 = %s AND part2 = %s", tableName, partitions1.get(threadNumber), partitions2.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part1, part2 FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', 'a', DATE '2024-01-01'), ('BB', 'b', DATE '2024-02-02'), ('CC', 'c', DATE '2024-03-03'), ('DD', 'd', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
Expand Down