Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class IcebergConfig
private boolean objectStoreLayoutEnabled;
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -580,4 +581,17 @@ public IcebergConfig setBucketExecutionEnabled(boolean bucketExecutionEnabled)
this.bucketExecutionEnabled = bucketExecutionEnabled;
return this;
}

public boolean isFileBasedConflictDetectionEnabled()
{
return fileBasedConflictDetectionEnabled;
}

@Config("iceberg.file-based-conflict-detection")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConfigHidden - this is a kill-switch as i see it .

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't mean it has to be hidden, it should just be removed eventually

@ConfigDescription("Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system")
public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConflictDetectionEnabled)
{
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isBucketExecutionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isFileBasedConflictDetectionEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isIncrementalRefreshEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -307,6 +308,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
Expand Down Expand Up @@ -353,6 +355,7 @@
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -3033,8 +3036,13 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
TupleDomain<IcebergColumnHandle> convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain));
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(convertibleUnenforcedPredicate);
TupleDomain<IcebergColumnHandle> effectivePredicate = dataColumnPredicate.intersect(table.getUnenforcedPredicate());
if (isFileBasedConflictDetectionEnabled(session)) {
effectivePredicate = effectivePredicate.intersect(extractTupleDomainsFromCommitTasks(table, icebergTable, commitTasks, typeManager));
}

effectivePredicate = effectivePredicate.filter((_, domain) -> isConvertibleToIcebergExpression(domain));

if (!effectivePredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(effectivePredicate));
}
Expand Down Expand Up @@ -3099,6 +3107,40 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
commitUpdateAndTransaction(rowDelta, session, transaction, "write");
}

static TupleDomain<IcebergColumnHandle> extractTupleDomainsFromCommitTasks(IcebergTableHandle table, Table icebergTable, List<CommitTaskData> commitTasks, TypeManager typeManager)
{
Set<IcebergColumnHandle> partitionColumns = new HashSet<>(getProjectedColumns(icebergTable.schema(), typeManager, identityPartitionColumnsInAllSpecs(icebergTable)));
PartitionSpec partitionSpec = icebergTable.spec();
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Map<IcebergColumnHandle, List<Domain>> domainsFromTasks = new HashMap<>();
for (CommitTaskData commitTask : commitTasks) {
PartitionSpec taskPartitionSpec = PartitionSpecParser.fromJson(schema, commitTask.partitionSpecJson());
if (commitTask.partitionDataJson().isEmpty() || taskPartitionSpec.isUnpartitioned() || !taskPartitionSpec.equals(partitionSpec)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can potentially use something like io.trino.plugin.iceberg.IcebergSplitSource#createFileStatisticsDomain for un-partitioned columns. Not necessary for current PR though.

// We should not produce any specific domains if there are no partitions or current partitions does not match task partitions for any of tasks
// As each partition value narrows down conflict scope we should produce values from all commit tasks or not at all, to avoid partial information
return TupleDomain.all();
}

PartitionData partitionData = PartitionData.fromJson(commitTask.partitionDataJson().get(), partitionColumnTypes);
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
Map<ColumnHandle, NullableValue> partitionValues = getPartitionValues(partitionColumns, partitionKeys);

for (Map.Entry<ColumnHandle, NullableValue> entry : partitionValues.entrySet()) {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) entry.getKey();
NullableValue value = entry.getValue();
Domain newDomain = value.isNull() ? Domain.onlyNull(columnHandle.getType()) : Domain.singleValue(columnHandle.getType(), value.getValue());
domainsFromTasks.computeIfAbsent(columnHandle, _ -> new ArrayList<>()).add(newDomain);
}
}
return withColumnDomains(domainsFromTasks.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> Domain.union(entry.getValue()))));
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map<String, Object> viewProperties, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public final class IcebergSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
public static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled";
public static final String FILE_BASED_CONFLICT_DETECTION_ENABLED = "file_based_conflict_detection_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -398,6 +399,11 @@ public IcebergSessionProperties(
"Enable bucket-aware execution: use physical bucketing information to optimize queries",
icebergConfig.isBucketExecutionEnabled(),
false))
.add(booleanProperty(
FILE_BASED_CONFLICT_DETECTION_ENABLED,
"Enable file-based conflict detection: take partition information from the actual written files as a source for the conflict detection system",
icebergConfig.isFileBasedConflictDetectionEnabled(),
false))
.build();
}

Expand Down Expand Up @@ -646,4 +652,9 @@ public static boolean isBucketExecutionEnabled(ConnectorSession session)
{
return session.getProperty(BUCKET_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isFileBasedConflictDetectionEnabled(ConnectorSession session)
{
return session.getProperty(FILE_BASED_CONFLICT_DETECTION_ENABLED, Boolean.class);
}
}
Loading