Skip to content

Commit

Permalink
Add support for concurrent write on Iceberg transformed column
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Nov 18, 2024
1 parent 9f77156 commit 0f00cdd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col

RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().intersect(table.getUnenforcedPredicate())
.filter((column, domain) -> !isMetadataColumnId(column.getId()));
if (!dataColumnPredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
Expand All @@ -96,13 +99,15 @@
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.SystemSessionProperties.DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.SystemSessionProperties.SCALE_WRITERS;
Expand Down Expand Up @@ -148,6 +153,7 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -420,6 +426,49 @@ public void testDecimal()
testDecimalWithPrecisionAndScale(38, 37);
}

@Test
public void testUpdateRowsWithPartitionTransformationConcurrently()
throws Exception
{
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("('A', DATE '2024-01-01')", "('B', DATE '2024-02-02')", "('C', DATE '2024-03-03')", "('D', DATE '2024-04-04')");
List<String> partitions = ImmutableList.of("DATE '2024-01-01'", "DATE '2024-02-02'", "DATE '2024-03-03'", "DATE '2024-04-04'");

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_udpate",
"(data varchar, part date) with (partitioning = array['month(part)'])")) {
String tableName = table.getName();

assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4);

List<Future<Boolean>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(format("UPDATE %s SET data = data || data WHERE part = %s", tableName, partitions.get(threadNumber)));
return true;
}))
.collect(toImmutableList());

futures.forEach(future -> {
Optional<Boolean> value = tryGetFutureValue(future, 20, SECONDS);
checkState(value.isPresent(), "Task did not complete in time");
boolean updateSuccessful = value.get();
checkState(updateSuccessful, "Task did not complete successfully");
});

assertThat(query("SELECT data, part FROM " + tableName))
.skippingTypesCheck()
.matches("VALUES ('AA', DATE '2024-01-01'), ('BB', DATE '2024-02-02'), ('CC', DATE '2024-03-03'), ('DD', DATE '2024-04-04')");
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

private void testDecimalWithPrecisionAndScale(int precision, int scale)
{
checkArgument(precision >= 1 && precision <= 38, "Decimal precision (%s) must be between 1 and 38 inclusive", precision);
Expand Down

0 comments on commit 0f00cdd

Please sign in to comment.