Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,23 @@ Property Name Description

``iceberg.delete-as-join-rewrite-enabled`` When enabled, equality delete row filtering is applied ``true`` Yes No, Equality delete read is not supported
as a join with the data of the equality delete files.
Deprecated: This property is deprecated and will be removed
in a future release. Use the
``iceberg.delete-as-join-rewrite-max-delete-columns``
configuration property instead.

``iceberg.delete-as-join-rewrite-max-delete-columns`` When set to a number greater than 0, this property enables ``400`` Yes No, Equality delete read is not supported
equality delete row filtering as a join with the data of the
equality delete files. The value of this property is the
maximum number of columns that can be used in the equality
delete files. If the number of columns in the equality delete
files exceeds this value, then the optimization is not
applied and the equality delete files are applied directly to
each row in the data files.

This property is only applicable when
``iceberg.delete-as-join-rewrite-enabled`` is set to
``true``.

``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true`` Yes No

Expand Down Expand Up @@ -499,6 +516,12 @@ Property Name Description
===================================================== ======================================================================= =================== =============================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property Yes No, Equality delete read is not supported
``iceberg.delete-as-join-rewrite-enabled`` in the current session.

Deprecated: This property is deprecated and will be removed. Use
``iceberg.delete_as_join_rewrite_max_delete_columns`` instead.
``iceberg.delete_as_join_rewrite_max_delete_columns`` Overrides the behavior of the connector property Yes No, Equality delete read is not supported
``iceberg.delete-as-join-rewrite-max-delete-columns`` in the
current session.
``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property Yes Yes
``iceberg.hive-statistics-merge-strategy`` in the current session.
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property Yes Yes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Splitter;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class IcebergConfig
private double statisticSnapshotRecordDifferenceWeight;
private boolean pushdownFilterEnabled;
private boolean deleteAsJoinRewriteEnabled = true;
private int deleteAsJoinRewriteMaxDeleteColumns = 400;
private int rowsForMetadataOptimizationThreshold = 1000;
private int metadataPreviousVersionsMax = METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;
private boolean metadataDeleteAfterCommit = METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT;
Expand Down Expand Up @@ -267,19 +269,39 @@ public boolean isPushdownFilterEnabled()
return pushdownFilterEnabled;
}

@Config("iceberg.delete-as-join-rewrite-enabled")
@ConfigDescription("When enabled, equality delete row filtering will be implemented by rewriting the query plan to join with the delete keys.")
@LegacyConfig(value = "iceberg.delete-as-join-rewrite-enabled")
@Config("deprecated.iceberg.delete-as-join-rewrite-enabled")
@ConfigDescription("When enabled, equality delete row filtering will be implemented by rewriting the query plan to join with the delete keys. " +
"Deprecated: Set 'iceberg.delete-as-join-rewrite-max-delete-columns' to 0 to control the enabling of this feature. This will be removed in a future release.")
@Deprecated
public IcebergConfig setDeleteAsJoinRewriteEnabled(boolean deleteAsJoinPushdownEnabled)
{
this.deleteAsJoinRewriteEnabled = deleteAsJoinPushdownEnabled;
return this;
}

@Deprecated
public boolean isDeleteAsJoinRewriteEnabled()
{
return deleteAsJoinRewriteEnabled;
}

@Config("iceberg.delete-as-join-rewrite-max-delete-columns")
@ConfigDescription("The maximum number of columns that can be used in a delete as join rewrite. " +
"If the number of columns exceeds this value, the delete as join rewrite will not be applied.")
@Min(0)
@Max(400)
public IcebergConfig setDeleteAsJoinRewriteMaxDeleteColumns(int deleteAsJoinRewriteMaxDeleteColumns)
{
this.deleteAsJoinRewriteMaxDeleteColumns = deleteAsJoinRewriteMaxDeleteColumns;
return this;
}

public int getDeleteAsJoinRewriteMaxDeleteColumns()
{
return deleteAsJoinRewriteMaxDeleteColumns;
}

@Config("iceberg.rows-for-metadata-optimization-threshold")
@ConfigDescription("The max partitions number to utilize metadata optimization. 0 means skip the metadata optimization directly.")
public IcebergConfig setRowsForMetadataOptimizationThreshold(int rowsForMetadataOptimizationThreshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.iceberg.nessie.IcebergNessieConfig;
import com.facebook.presto.iceberg.util.StatisticsUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Joiner;
Expand All @@ -33,15 +34,18 @@
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.iceberg.util.StatisticsUtil.SUPPORTED_MERGE_FLAGS;
import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static java.lang.String.format;

public final class IcebergSessionProperties
{
Expand All @@ -62,6 +66,7 @@ public final class IcebergSessionProperties
public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled";
public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled";
public static final String DELETE_AS_JOIN_REWRITE_ENABLED = "delete_as_join_rewrite_enabled";
public static final String DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS = "delete_as_join_rewrite_max_delete_columns";
public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy";
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
Expand Down Expand Up @@ -181,6 +186,23 @@ public IcebergSessionProperties(
"When enabled equality delete row filtering will be pushed down into a join.",
icebergConfig.isDeleteAsJoinRewriteEnabled(),
false))
.add(new PropertyMetadata<>(
DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS,
"The maximum number of columns that can be used in a delete as join rewrite. " +
"If the number of columns exceeds this value, the delete as join rewrite will not be applied.",
INTEGER,
Integer.class,
icebergConfig.getDeleteAsJoinRewriteMaxDeleteColumns(),
false,
value -> {
int intValue = ((Number) value).intValue();
if (intValue < 0 || intValue > 400) {
throw new PrestoException(INVALID_SESSION_PROPERTY,
format("Invalid value for %s: %s. It must be between 0 and 400.", DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, intValue));
}
return intValue;
},
integer -> integer))
.add(integerProperty(
ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD,
"The max partitions number to utilize metadata optimization. When partitions number " +
Expand Down Expand Up @@ -311,6 +333,11 @@ public static boolean isDeleteToJoinPushdownEnabled(ConnectorSession session)
return session.getProperty(DELETE_AS_JOIN_REWRITE_ENABLED, Boolean.class);
}

public static int getDeleteAsJoinRewriteMaxDeleteColumns(ConnectorSession session)
{
return session.getProperty(DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, Integer.class);
}

public static int getRowsForMetadataOptimizationThreshold(ConnectorSession session)
{
return session.getProperty(ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getDeleteAsJoinRewriteMaxDeleteColumns;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isDeleteToJoinPushdownEnabled;
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteFiles;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -130,7 +132,9 @@ public class IcebergEqualityDeleteAsJoin
@Override
public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
{
if (!isDeleteToJoinPushdownEnabled(session)) {
int maxDeleteColumns = getDeleteAsJoinRewriteMaxDeleteColumns(session);
checkArgument(maxDeleteColumns >= 0, "maxDeleteColumns must be non-negative, got %s", maxDeleteColumns);
if (!isDeleteToJoinPushdownEnabled(session) || maxDeleteColumns == 0) {
return maxSubplan;
}
return rewriteWith(new DeleteAsJoinRewriter(functionResolution,
Expand Down Expand Up @@ -190,6 +194,10 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
// no equality deletes
return node;
}
if (deleteSchemas.keySet().stream().anyMatch(equalityIds -> equalityIds.size() > getDeleteAsJoinRewriteMaxDeleteColumns(session))) {
// Too many fields in the delete schema, don't rewrite
return node;
}

// Add all the fields required by the join that were not added by the user's query
ImmutableMap<VariableReferenceExpression, ColumnHandle> unselectedAssignments = createAssignmentsForUnselectedFields(node, deleteSchemas, icebergTable);
Expand Down
Loading
Loading