Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"iceberg.allow-legacy-snapshot-syntax",
"iceberg.experimental.extended-statistics.enabled",
"iceberg.extended-statistics.enabled",
"iceberg.file-based-conflict-detection",
})
public class IcebergConfig
{
Expand Down Expand Up @@ -103,7 +104,6 @@ public class IcebergConfig
private boolean objectStoreLayoutEnabled;
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -682,17 +682,4 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled)
this.bucketExecutionEnabled = bucketExecutionEnabled;
return this;
}

public boolean isFileBasedConflictDetectionEnabled()
{
return fileBasedConflictDetectionEnabled;
}

@Config("iceberg.file-based-conflict-detection")
Copy link
Copy Markdown
Member

@raunaqmorarka raunaqmorarka Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Defunct ?

@ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system")
public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled)
{
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -3604,10 +3603,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
}
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(table.getUnenforcedPredicate());
if (isFileBasedConflictDetectionEnabled(session)) {
effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager));
}

effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager));
effectivePredicate = effectivePredicate.filter((_, domain) -> isConvertibleToIcebergExpression(domain));

if (!effectivePredicate.isAll()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public final class IcebergSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled";
public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled";
private static final String MAX_PARTITIONS_PER_WRITER = "max_partitions_per_writer";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -385,11 +384,6 @@ public IcebergSessionProperties(
"Enable bucket-aware execution: use physical bucketing information to optimize queries",
icebergConfig.isBucketExecutionEnabled(),
false))
.add(booleanProperty(
FILE_BASED_CONFLICT_DETECTION_ENABLED,
"Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system",
icebergConfig.isFileBasedConflictDetectionEnabled(),
false))
.add(integerProperty(
MAX_PARTITIONS_PER_WRITER,
"Maximum number of partitions per writer",
Expand Down Expand Up @@ -641,11 +635,6 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session)
return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session)
{
return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class);
}

public static int maxPartitionsPerWriter(ConnectorSession session)
{
return session.getProperty(MAX_PARTITIONS_PER_WRITER, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public void testDefaults()
.setMaterializedViewRefreshSnapshotRetentionPeriod(new Duration(4, HOURS))
.setObjectStoreLayoutEnabled(false)
.setMetadataParallelism(8)
.setBucketExecutionEnabled(true)
.setFileBasedConflictDetectionEnabled(true));
.setBucketExecutionEnabled(true));
}

@Test
Expand Down Expand Up @@ -134,7 +133,6 @@ public void testExplicitPropertyMappings()
.put("iceberg.object-store-layout.enabled", "true")
.put("iceberg.metadata.parallelism", "10")
.put("iceberg.bucket-execution", "false")
.put("iceberg.file-based-conflict-detection", "false")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand Down Expand Up @@ -178,8 +176,7 @@ public void testExplicitPropertyMappings()
.setMaterializedViewRefreshSnapshotRetentionPeriod(new Duration(1, HOURS))
.setObjectStoreLayoutEnabled(true)
.setMetadataParallelism(10)
.setBucketExecutionEnabled(false)
.setFileBasedConflictDetectionEnabled(false);
.setBucketExecutionEnabled(false);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.plugin.iceberg.IcebergSessionProperties.FILE_BASED_CONFLICT_DETECTION_ENABLED;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand Down Expand Up @@ -371,13 +370,6 @@ void testConcurrentTruncateAndInserts()
@RepeatedTest(3)
void testConcurrentNonOverlappingUpdate()
throws Exception
{
testConcurrentNonOverlappingUpdate(getSession());
testConcurrentNonOverlappingUpdate(withFileBasedConflictDetectionDisabledSession());
}

private void testConcurrentNonOverlappingUpdate(Session session)
throws InterruptedException
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
Expand All @@ -391,17 +383,17 @@ private void testConcurrentNonOverlappingUpdate(Session session)
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 10");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 10");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part = 20");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part = 20");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE part IS NULL");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE part IS NULL");
return null;
})
.build())
Expand Down Expand Up @@ -537,13 +529,6 @@ private void testConcurrentOverlappingUpdate(boolean partitioned)
@RepeatedTest(3)
void testConcurrentNonOverlappingUpdateOnNestedPartition()
throws Exception
{
testConcurrentNonOverlappingUpdateOnNestedPartition(getSession());
testConcurrentNonOverlappingUpdateOnNestedPartition(withFileBasedConflictDetectionDisabledSession());
}

private void testConcurrentNonOverlappingUpdateOnNestedPartition(Session session)
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
Expand All @@ -563,17 +548,17 @@ private void testConcurrentNonOverlappingUpdateOnNestedPartition(Session session
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 10");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child = 20");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute(session, "UPDATE " + tableName + " SET a = a + 1 WHERE parent.child IS NULL");
getQueryRunner().execute("UPDATE " + tableName + " SET a = a + 1 WHERE parent.child IS NULL");
return null;
})
.build())
Expand Down Expand Up @@ -1403,11 +1388,4 @@ private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private Session withFileBasedConflictDetectionDisabledSession()
{
return Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), FILE_BASED_CONFLICT_DETECTION_ENABLED, "false")
.build();
}
}