diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 59cdb6fac9b6..c7a72000d695 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -51,6 +51,7 @@ "iceberg.allow-legacy-snapshot-syntax", "iceberg.experimental.extended-statistics.enabled", "iceberg.extended-statistics.enabled", + "iceberg.file-based-conflict-detection", }) public class IcebergConfig { @@ -103,7 +104,6 @@ public class IcebergConfig private boolean objectStoreLayoutEnabled; private int metadataParallelism = 8; private boolean bucketExecutionEnabled = true; - private boolean fileBasedConflictDetectionEnabled = true; public CatalogType getCatalogType() { @@ -682,17 +682,4 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled) this.bucketExecutionEnabled = bucketExecutionEnabled; return this; } - - public boolean isFileBasedConflictDetectionEnabled() - { - return fileBasedConflictDetectionEnabled; - } - - @Config("iceberg.file-based-conflict-detection") - @ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system") - public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled) - { - this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled; - return this; - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 942cae8669f3..8e3b523af932 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -334,7 +334,6 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention; import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite; -import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled; @@ -3604,10 +3603,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col } TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); TupleDomain effectivePredicate = dataColumnPredicate.intersect(table.getUnenforcedPredicate()); - if (isFileBasedConflictDetectionEnabled(session)) { - effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager)); - } - + effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager)); effectivePredicate = effectivePredicate.filter((_, domain) -> isConvertibleToIcebergExpression(domain)); if (!effectivePredicate.isAll()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 0e742e49d9ad..84fca8c296ee 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -106,7 +106,6 @@ public final class IcebergSessionProperties private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas"; private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled"; public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled"; - public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled"; private static final String MAX_PARTITIONS_PER_WRITER = "max_partitions_per_writer"; private final List> sessionProperties; @@ -385,11 +384,6 @@ public IcebergSessionProperties( "Enable bucket-aware execution: use physical bucketing information to optimize queries", icebergConfig.isBucketExecutionEnabled(), false)) - .add(booleanProperty( - FILE_BASED_CONFLICT_DETECTION_ENABLED, - "Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system", - icebergConfig.isFileBasedConflictDetectionEnabled(), - false)) .add(integerProperty( MAX_PARTITIONS_PER_WRITER, "Maximum number of partitions per writer", @@ -641,11 +635,6 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session) return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class); } - public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session) - { - return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class); - } - public static int maxPartitionsPerWriter(ConnectorSession session) { return session.getProperty(MAX_PARTITIONS_PER_WRITER, Integer.class); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index d6d0d0b8ffd1..90450734c4a3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -86,8 +86,7 @@ public void testDefaults() .setMaterializedViewRefreshSnapshotRetentionPeriod(new Duration(4, HOURS)) .setObjectStoreLayoutEnabled(false) .setMetadataParallelism(8) - .setBucketExecutionEnabled(true) - .setFileBasedConflictDetectionEnabled(true)); + .setBucketExecutionEnabled(true)); } @Test @@ -134,7 +133,6 @@ public void testExplicitPropertyMappings() .put("iceberg.object-store-layout.enabled", "true") .put("iceberg.metadata.parallelism", "10") .put("iceberg.bucket-execution", "false") - .put("iceberg.file-based-conflict-detection", "false") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -178,8 +176,7 @@ public void testExplicitPropertyMappings() .setMaterializedViewRefreshSnapshotRetentionPeriod(new Duration(1, HOURS)) .setObjectStoreLayoutEnabled(true) .setMetadataParallelism(10) - .setBucketExecutionEnabled(false) - .setFileBasedConflictDetectionEnabled(false); + .setBucketExecutionEnabled(false); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java index 375704db5cdd..9fe65fe63476 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergLocalConcurrentWrites.java @@ -40,7 +40,6 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; -import static io.trino.plugin.iceberg.IcebergSessionProperties.FILE_BASED_CONFLICT_DETECTION_ENABLED; import static io.trino.testing.QueryAssertions.getTrinoExceptionCause; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; @@ -371,13 +370,6 @@ void testConcurrentTruncateAndInserts() @RepeatedTest(3) void testConcurrentNonOverlappingUpdate() throws Exception - { - testConcurrentNonOverlappingUpdate(getSession()); - testConcurrentNonOverlappingUpdate(withFileBasedConflictDetectionDisabledSession()); - } - - private void testConcurrentNonOverlappingUpdate(Session session) - throws InterruptedException { int threads = 3; CyclicBarrier barrier = new CyclicBarrier(threads); @@ -391,17 +383,17 @@ private void testConcurrentNonOverlappingUpdate(Session session) executor.invokeAll(ImmutableList.>builder() .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 10"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 10"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 20"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 20"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part IS NULL"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part IS NULL"); return null; }) .build()) @@ -537,13 +529,6 @@ private void testConcurrentOverlappingUpdate(boolean partitioned) @RepeatedTest(3) void testConcurrentNonOverlappingUpdateOnNestedPartition() throws Exception - { - testConcurrentNonOverlappingUpdateOnNestedPartition(getSession()); - testConcurrentNonOverlappingUpdateOnNestedPartition(withFileBasedConflictDetectionDisabledSession()); - } - - private void testConcurrentNonOverlappingUpdateOnNestedPartition(Session session) - throws Exception { int threads = 3; CyclicBarrier barrier = new CyclicBarrier(threads); @@ -563,17 +548,17 @@ private void testConcurrentNonOverlappingUpdateOnNestedPartition(Session session executor.invokeAll(ImmutableList.>builder() .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20"); return null; }) .add(() -> { barrier.await(10, SECONDS); - getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child IS NULL"); + getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child IS NULL"); return null; }) .build()) @@ -1403,11 +1388,4 @@ private long getCurrentSnapshotId(String tableName) { return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); } - - private Session withFileBasedConflictDetectionDisabledSession() - { - return Session.builder(getSession()) - .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), FILE_BASED_CONFLICT_DETECTION_ENABLED, "false") - .build(); - } }