diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 51040a281419..fdf962e07f10 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -96,6 +96,7 @@ public class IcebergConfig private boolean objectStoreLayoutEnabled; private int metadataParallelism = 8; private boolean bucketExecutionEnabled = true; + private boolean fileBasedConflictDetectionEnabled = true; public CatalogType getCatalogType() { @@ -580,4 +581,17 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled) this.bucketExecutionEnabled = bucketExecutionEnabled; return this; } + + public boolean isFileBasedConflictDetectionEnabled() + { + return fileBasedConflictDetectionEnabled; + } + + @Config("iceberg.file-based-conflict-detection") + @ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system") + public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled) + { + this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 1b41cb6cf4ad..a0a13bb1f2df 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -272,6 +272,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; +import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled; @@ -307,6 +308,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues; import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns; import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; @@ -353,6 +355,7 @@ import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW; +import static io.trino.spi.predicate.TupleDomain.withColumnDomains; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.trino.spi.type.DateType.DATE; @@ -3033,8 +3036,13 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col RowDelta rowDelta = transaction.newRowDelta(); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); - TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain)); - TupleDomain effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate); + TupleDomain effectivePredicate = dataColumnPredicate.intersect(table.getUnenforcedPredicate()); + if (isFileBasedConflictDetectionEnabled(session)) { + effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager)); + } + + effectivePredicate = effectivePredicate.filter((_, domain) -> isConvertibleToIcebergExpression(domain)); + if (!effectivePredicate.isAll()) { rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate)); } @@ -3099,6 +3107,40 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col commitUpdateAndTransaction(rowDelta, session, transaction, "write"); } + static TupleDomain extractTupleDomainsFromCommitTasks(IcebergTableHandle table, Table icebergTable, List commitTasks, TypeManager typeManager) + { + Set partitionColumns = new HashSet<>(getProjectedColumns(icebergTable.schema(), typeManager, identityPartitionColumnsInAllSpecs(icebergTable))); + PartitionSpec partitionSpec = icebergTable.spec(); + Type[] partitionColumnTypes = partitionSpec.fields().stream() + .map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); + Map> domainsFromTasks = new HashMap<>(); + for (CommitTaskData commitTask : commitTasks) { + PartitionSpec taskPartitionSpec = PartitionSpecParser.fromJson(schema, commitTask.partitionSpecJson()); + if (commitTask.partitionDataJson().isEmpty() || taskPartitionSpec.isUnpartitioned() || !taskPartitionSpec.equals(partitionSpec)) { + // We should not produce any specific domains if there are no partitions or current partitions does not match task partitions for any of tasks + // As each partition value narrows down conflict scope we should produce values from all commit tasks or not at all, to avoid partial information + return TupleDomain.all(); + } + + PartitionData partitionData = PartitionData.fromJson(commitTask.partitionDataJson().get(), partitionColumnTypes); + Map> partitionKeys = getPartitionKeys(partitionData, partitionSpec); + Map partitionValues = getPartitionValues(partitionColumns, partitionKeys); + + for (Map.Entry entry : partitionValues.entrySet()) { + IcebergColumnHandle columnHandle = (IcebergColumnHandle) entry.getKey(); + NullableValue value = entry.getValue(); + Domain newDomain = value.isNull() ? Domain.onlyNull(columnHandle.getType()) : Domain.singleValue(columnHandle.getType(), value.getValue()); + domainsFromTasks.computeIfAbsent(columnHandle, _ -> new ArrayList<>()).add(newDomain); + } + } + return withColumnDomains(domainsFromTasks.entrySet().stream() + .collect(toImmutableMap( + Map.Entry::getKey, + entry -> Domain.union(entry.getValue())))); + } + @Override public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map viewProperties, boolean replace) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 64a8b088d2b0..2e39057f6a41 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -110,6 +110,7 @@ public final class IcebergSessionProperties private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas"; private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled"; public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled"; + public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled"; private final List> sessionProperties; @@ -398,6 +399,11 @@ public IcebergSessionProperties( "Enable bucket-aware execution: use physical bucketing information to optimize queries", icebergConfig.isBucketExecutionEnabled(), false)) + .add(booleanProperty( + FILE_BASED_CONFLICT_DETECTION_ENABLED, + "Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system", + icebergConfig.isFileBasedConflictDetectionEnabled(), + false)) .build(); } @@ -646,4 +652,9 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session) { return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class); } + + public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session) + { + return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java new file mode 100644 index 000000000000..bbfb093e5e68 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java @@ -0,0 +1,288 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.connector.CatalogHandle; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.RowType; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.STRUCT; +import static io.trino.plugin.iceberg.IcebergMetadata.extractTupleDomainsFromCommitTasks; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.apache.iceberg.FileContent.DATA; +import static org.apache.iceberg.FileContent.POSITION_DELETES; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; + +class TestFileBasedConflictDetection +{ + private static final HadoopTables HADOOP_TABLES = new HadoopTables(new Configuration(false)); + private static final String COLUMN_1_NAME = "col1"; + private static final ColumnIdentity COLUMN_1_IDENTITY = new ColumnIdentity(1, COLUMN_1_NAME, PRIMITIVE, ImmutableList.of()); + private static final IcebergColumnHandle COLUMN_1_HANDLE = new IcebergColumnHandle(COLUMN_1_IDENTITY, INTEGER, ImmutableList.of(), INTEGER, true, Optional.empty()); + private static final String COLUMN_2_NAME = "part"; + private static final ColumnIdentity COLUMN_2_IDENTITY = new ColumnIdentity(2, COLUMN_2_NAME, PRIMITIVE, ImmutableList.of()); + private static final IcebergColumnHandle COLUMN_2_HANDLE = new IcebergColumnHandle(COLUMN_2_IDENTITY, INTEGER, ImmutableList.of(), INTEGER, true, Optional.empty()); + private static final String CHILD_COLUMN_NAME = "child"; + private static final ColumnIdentity CHILD_COLUMN_IDENTITY = new ColumnIdentity(4, CHILD_COLUMN_NAME, PRIMITIVE, ImmutableList.of()); + private static final String PARENT_COLUMN_NAME = "parent"; + private static final ColumnIdentity PARENT_COLUMN_IDENTITY = new ColumnIdentity(3, PARENT_COLUMN_NAME, STRUCT, ImmutableList.of(CHILD_COLUMN_IDENTITY)); + private static final IcebergColumnHandle CHILD_COLUMN_HANDLE = new IcebergColumnHandle(PARENT_COLUMN_IDENTITY, RowType.rowType(new RowType.Field(Optional.of(CHILD_COLUMN_NAME), INTEGER)), ImmutableList.of(CHILD_COLUMN_IDENTITY.getId()), INTEGER, true, Optional.empty()); + private static final Schema TABLE_SCHEMA = new Schema( + optional(COLUMN_1_IDENTITY.getId(), COLUMN_1_NAME, Types.IntegerType.get()), + optional(COLUMN_2_IDENTITY.getId(), COLUMN_2_NAME, Types.IntegerType.get()), + optional( + PARENT_COLUMN_IDENTITY.getId(), + PARENT_COLUMN_NAME, + Types.StructType.of(optional(CHILD_COLUMN_IDENTITY.getId(), CHILD_COLUMN_NAME, Types.IntegerType.get())))); + + @Test + void testConflictDetectionOnNonPartitionedTable() + { + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + Table icebergTable = createIcebergTable(partitionSpec); + + List commitTasks = getCommitTaskDataForUpdate(partitionSpec, Optional.empty()); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEmpty(); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnPartitionedTable() + { + PartitionSpec partitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_2_NAME) + .build(); + Table icebergTable = createIcebergTable(partitionSpec); + + String partitionDataJson = + """ + {"partitionValues":[40]} + """; + Map expectedDomains = Map.of(COLUMN_2_HANDLE, Domain.singleValue(INTEGER, 40L)); + List commitTasks = getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson)); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEqualTo(expectedDomains); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnPartitionedTableWithMultiplePartitionValues() + { + PartitionSpec partitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_2_NAME) + .build(); + Table icebergTable = createIcebergTable(partitionSpec); + + String partitionDataJson1 = + """ + {"partitionValues":[40]} + """; + String partitionDataJson2 = + """ + {"partitionValues":[50]} + """; + Map expectedDomains = Map.of(COLUMN_2_HANDLE, Domain.multipleValues(INTEGER, ImmutableList.of(40L, 50L))); + // Create commit tasks for updates in two partitions, with values 40 and 50 + List commitTasks = Stream.concat(getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson1)).stream(), + getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson2)).stream()).collect(toImmutableList()); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEqualTo(expectedDomains); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnNestedPartitionedTable() + { + PartitionSpec partitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(PARENT_COLUMN_NAME + "." + CHILD_COLUMN_NAME) + .build(); + Table icebergTable = createIcebergTable(partitionSpec); + + String partitionDataJson = + """ + {"partitionValues":[40]} + """; + Map expectedDomains = Map.of(CHILD_COLUMN_HANDLE, Domain.singleValue(INTEGER, 40L)); + List commitTasks = getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson)); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEqualTo(expectedDomains); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnTableWithTwoPartitions() + { + PartitionSpec partitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_2_NAME) + .identity(COLUMN_1_NAME) + .build(); + Table icebergTable = createIcebergTable(partitionSpec); + + String partitionDataJson = + """ + {"partitionValues":[40, 12]} + """; + Map expectedDomains = Map.of(COLUMN_2_HANDLE, Domain.singleValue(INTEGER, 40L), COLUMN_1_HANDLE, Domain.singleValue(INTEGER, 12L)); + List commitTasks = getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson)); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEqualTo(expectedDomains); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnTableWithTwoPartitionsAndMissingPartitionData() + { + PartitionSpec partitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_2_NAME) + .identity(COLUMN_1_NAME) + .build(); + Table icebergTable = createIcebergTable(partitionSpec); + + String partitionDataJson = + """ + {"partitionValues":[40]} + """; + Map expectedDomains = Map.of(COLUMN_2_HANDLE, Domain.singleValue(INTEGER, 40L), COLUMN_1_HANDLE, Domain.onlyNull(INTEGER)); + List commitTasks = getCommitTaskDataForUpdate(partitionSpec, Optional.of(partitionDataJson)); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(partitionSpec), icebergTable, commitTasks, null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEqualTo(expectedDomains); + + dropIcebergTable(icebergTable); + } + + @Test + void testConflictDetectionOnEvolvedTable() + { + PartitionSpec previousPartitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_1_NAME) + .build(); + PartitionSpec currentPartitionSpec = PartitionSpec.builderFor(TABLE_SCHEMA) + .identity(COLUMN_2_NAME) + .build(); + Table icebergTable = createIcebergTable(currentPartitionSpec); + + String partitionDataJson = + """ + {"partitionValues":[40]} + """; + CommitTaskData commitTaskData1 = new CommitTaskData("test_location/data/new.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(currentPartitionSpec), + Optional.of(partitionDataJson), DATA, Optional.empty(), Optional.empty()); + // Remove file from version with previous partition specification + CommitTaskData commitTaskData2 = new CommitTaskData("test_location/data/old.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(previousPartitionSpec), + Optional.of(partitionDataJson), POSITION_DELETES, Optional.empty(), Optional.empty()); + TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(currentPartitionSpec), icebergTable, List.of(commitTaskData1, commitTaskData2), null); + assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEmpty(); + + dropIcebergTable(icebergTable); + } + + private static List getCommitTaskDataForUpdate(PartitionSpec partitionSpec, Optional partitionDataJson) + { + // Update operation contains two commit tasks + CommitTaskData commitTaskData1 = new CommitTaskData( + "test_location/data/new.parquet", + IcebergFileFormat.PARQUET, + 0, + new MetricsWrapper(new Metrics()), + PartitionSpecParser.toJson(partitionSpec), + partitionDataJson, + DATA, + Optional.empty(), + Optional.empty()); + CommitTaskData commitTaskData2 = new CommitTaskData( + "test_location/data/old.parquet", + IcebergFileFormat.PARQUET, + 0, + new MetricsWrapper(new Metrics()), + PartitionSpecParser.toJson(partitionSpec), + partitionDataJson, + POSITION_DELETES, + Optional.empty(), + Optional.empty()); + + return List.of(commitTaskData1, commitTaskData2); + } + + private static IcebergTableHandle getIcebergTableHandle(PartitionSpec partitionSpec) + { + String partitionSpecJson = PartitionSpecParser.toJson(partitionSpec); + return new IcebergTableHandle( + CatalogHandle.fromId("iceberg:NORMAL:v12345"), + "schemaName", + "tableName", + TableType.DATA, + Optional.empty(), + SchemaParser.toJson(TABLE_SCHEMA), + Optional.of(partitionSpecJson), + 1, + TupleDomain.all(), + TupleDomain.all(), + OptionalLong.empty(), + ImmutableSet.of(), + Optional.empty(), + "dummy_table_location", + ImmutableMap.of(), + Optional.empty(), + false, + Optional.empty(), + ImmutableSet.of(), + Optional.of(false)); + } + + private static Table createIcebergTable(PartitionSpec partitionSpec) + { + return HADOOP_TABLES.create( + TABLE_SCHEMA, + partitionSpec, + SortOrder.unsorted(), + ImmutableMap.of("write.format.default", "ORC"), + "table_location" + randomNameSuffix()); + } + + private static void dropIcebergTable(Table icebergTable) + { + HADOOP_TABLES.dropTable(icebergTable.location()); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 8edfa5ee4c39..24ebe70e72b6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -79,7 +79,8 @@ public void testDefaults() .setIncrementalRefreshEnabled(true) .setObjectStoreLayoutEnabled(false) .setMetadataParallelism(8) - .setBucketExecutionEnabled(true)); + .setBucketExecutionEnabled(true) + .setFileBasedConflictDetectionEnabled(true)); } @Test @@ -120,6 +121,7 @@ public void testExplicitPropertyMappings() .put("iceberg.object-store-layout.enabled", "true") .put("iceberg.metadata.parallelism", "10") .put("iceberg.bucket-execution", "false") + .put("iceberg.file-based-conflict-detection", "false") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -157,7 +159,8 @@ public void testExplicitPropertyMappings() .setIncrementalRefreshEnabled(false) .setObjectStoreLayoutEnabled(true) .setMetadataParallelism(10) - .setBucketExecutionEnabled(false); + .setBucketExecutionEnabled(false) + .setFileBasedConflictDetectionEnabled(false); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java index ab9b5a6f5d5c..a950f5296e97 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.concurrent.MoreFutures; +import io.trino.Session; import io.trino.plugin.blackhole.BlackHolePlugin; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; @@ -39,6 +40,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; +import static io.trino.plugin.iceberg.IcebergSessionProperties.FILE_BASED_CONFLICT_DETECTION_ENABLED; import static io.trino.testing.QueryAssertions.getTrinoExceptionCause; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; @@ -369,36 +371,94 @@ void testConcurrentTruncateAndInserts() @RepeatedTest(3) void testConcurrentNonOverlappingUpdate() throws Exception + { + testConcurrentNonOverlappingUpdate(getSession()); + testConcurrentNonOverlappingUpdate(withFileBasedConflictDetectionDisabledSession()); + } + + private void testConcurrentNonOverlappingUpdate(Session session) + throws InterruptedException { int threads = 3; CyclicBarrier barrier = new CyclicBarrier(threads); ExecutorService executor = newFixedThreadPool(threads); String tableName = "test_concurrent_non_overlapping_updates_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioning = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4); + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioning = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, NULL), (31, 40)", 4); try { // update data concurrently by using non-overlapping partition predicate executor.invokeAll(ImmutableList.>builder() .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 10"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 10"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 20"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 20"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 30"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part IS NULL"); return null; }) .build()) .forEach(MoreFutures::getDone); - assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (2, 10), (12, 20), (22, 30), (31, 40)"); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (2, 10), (12, 20), (22, NULL), (31, 40)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @RepeatedTest(3) + void testConcurrentNonOverlappingUpdateMultipleDataFiles() + throws Exception + { + int threads = 3; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_non_overlapping_updates_table_" + randomNameSuffix(); + // Force creating more parquet files + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "target_max_file_size", "1kB") + .build(); + + assertUpdate("CREATE TABLE " + tableName + " (a BIGINT, part BIGINT) WITH (partitioning = ARRAY['part'])"); + assertUpdate(session, " INSERT INTO " + tableName + " SELECT * FROM " + + "(select * from UNNEST(SEQUENCE(1, 10000)) AS t(a)) CROSS JOIN (select * from UNNEST(SEQUENCE(1, 3)) AS t(part))", 30000); + + // UPDATE will increase every value by 1 + long expectedDataSum = (long) computeScalar("SELECT sum(a + 1) FROM " + tableName); + + try { + // update data concurrently by using non-overlapping partition predicate + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 1"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 2"); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 3"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertThat((long) computeScalar("SELECT SUM(a) FROM " + tableName)).isEqualTo(expectedDataSum); } finally { assertUpdate("DROP TABLE " + tableName); @@ -426,7 +486,7 @@ private void testConcurrentOverlappingUpdate(boolean partitioned) assertUpdate("CREATE TABLE " + tableName + " (a, part) " + (partitioned ? " WITH (partitioning = ARRAY['part'])" : "") + - " AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4); + " AS VALUES (1, 10), (11, 20), (21, NULL), (31, 40)", 4); try { List> futures = IntStream.range(0, threads) @@ -461,9 +521,9 @@ private void testConcurrentOverlappingUpdate(boolean partitioned) assertThat(successes).isGreaterThanOrEqualTo(1); //There can be different possible results depending on query order execution. switch ((int) successes) { - case 1 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (22, 30), (32, 40)"); - case 2 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (23, 30), (33, 40)"); - case 3 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (24, 30), (34, 40)"); + case 1 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (22, NULL), (32, 40)"); + case 2 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (23, NULL), (33, 40)"); + case 3 -> assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (24, NULL), (34, 40)"); } } finally { @@ -477,6 +537,13 @@ private void testConcurrentOverlappingUpdate(boolean partitioned) @RepeatedTest(3) void testConcurrentNonOverlappingUpdateOnNestedPartition() throws Exception + { + testConcurrentNonOverlappingUpdateOnNestedPartition(getSession()); + testConcurrentNonOverlappingUpdateOnNestedPartition(withFileBasedConflictDetectionDisabledSession()); + } + + private void testConcurrentNonOverlappingUpdateOnNestedPartition(Session session) + throws Exception { int threads = 3; CyclicBarrier barrier = new CyclicBarrier(threads); @@ -488,7 +555,7 @@ void testConcurrentNonOverlappingUpdateOnNestedPartition() "INSERT INTO " + tableName + " VALUES " + "(1, ROW(10)), " + "(11, ROW(20)), " + - "(21, ROW(30)), " + + "(21, ROW(NULL)), " + "(31, ROW(40))", 4); try { @@ -496,23 +563,23 @@ void testConcurrentNonOverlappingUpdateOnNestedPartition() executor.invokeAll(ImmutableList.>builder() .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 30"); + getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child IS NULL"); return null; }) .build()) .forEach(MoreFutures::getDone); - assertThat(query("SELECT a, parent.child FROM " + tableName)).matches("VALUES (2, 10), (12, 20), (22, 30), (31, 40)"); + assertThat(query("SELECT a, parent.child FROM " + tableName)).matches("VALUES (2, 10), (12, 20), (22, NULL), (31, 40)"); } finally { assertUpdate("DROP TABLE " + tableName); @@ -678,6 +745,80 @@ void testConcurrentUpdateAndInserts() } } + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @RepeatedTest(3) + public void testConcurrentMerge() + throws Exception + { + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_merges_table_" + randomNameSuffix(); + String sourceTableName = "test_concurrent_merges_source_table_" + randomNameSuffix(); + + // Helper table to simulate longer query time during MERGE + assertUpdate("CREATE TABLE " + sourceTableName + " (a, part, string_rep) AS SELECT *, format('a%spart%s', a, part) FROM " + + "(select * from UNNEST(SEQUENCE(1, 2000)) AS t(a)) CROSS JOIN (select * from UNNEST(SEQUENCE(1, 2000)) AS t(part))", 4000000); + assertUpdate("INSERT INTO " + sourceTableName + " VALUES (42, NULL, 'a42partNULL')", 1); + + assertUpdate("CREATE TABLE " + tableName + " (a, part) WITH (partitioning = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40), (41, NULL)", 5); + // Add more files in the partition 30 + assertUpdate("INSERT INTO " + tableName + " VALUES (22, 30)", 1); + try { + // merge data concurrently by using non-overlapping partition predicate + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute( + """ + MERGE INTO %s t USING (select a, part from %s where string_rep LIKE '%%a12part20') AS s + ON (FALSE) + WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part) + """.formatted(tableName, sourceTableName)); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute( + """ + MERGE INTO %s t USING (select a, part from %s where string_rep LIKE '%%a42partNULL') AS s + ON (FALSE) + WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part) + """.formatted(tableName, sourceTableName)); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute( + """ + MERGE INTO %s t USING (VALUES (21, 30)) AS s(a, part) + ON (t.part = s.part) + WHEN MATCHED THEN DELETE + """.formatted(tableName)); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute( + """ + MERGE INTO %s t USING (VALUES (32, 40)) AS s(a, part) + ON (t.part = s.part) + WHEN MATCHED THEN UPDATE SET a = s.a + """.formatted(tableName)); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (12, 20), (32, 40), (41, NULL), (42, NULL)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. @RepeatedTest(3) void testConcurrentMergeAndInserts() @@ -1148,4 +1289,11 @@ private long getCurrentSnapshotId(String tableName) { return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); } + + private Session withFileBasedConflictDetectionDisabledSession() + { + return Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), FILE_BASED_CONFLICT_DETECTION_ENABLED, "false") + .build(); + } }