Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
Expand All @@ -79,16 +77,12 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -101,7 +95,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
Expand Down Expand Up @@ -131,6 +124,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
Expand All @@ -148,7 +142,6 @@
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta;
import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Verify.verify;
Expand All @@ -159,6 +152,9 @@
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -769,12 +765,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
try (CloseableIterable<FileScanTask> files = icebergTable.newScan().planFiles()) {
removeScanFiles(icebergTable, files);
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e);
}
removeScanFiles(icebergTable, TupleDomain.all());
}

@Override
Expand Down Expand Up @@ -883,11 +874,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa

// Get partition specs that really need to be checked
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Set<Integer> partitionSpecIds = handle.getIcebergTableName().getSnapshotId().map(
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, handle.getIcebergTableName().getSnapshotId());

Set<Integer> enforcedColumnIds = getEnforcedColumns(icebergTable, partitionSpecIds, domainPredicate, session)
.stream()
Expand Down Expand Up @@ -916,39 +903,29 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
throw new TableNotFoundException(handle.getSchemaTableName());
}

TableScan scan = icebergTable.newScan();
TupleDomain<IcebergColumnHandle> domainPredicate = layoutHandle.getValidPredicate();

if (!domainPredicate.isAll()) {
Expression filterExpression = toIcebergExpression(domainPredicate);
scan = scan.filter(filterExpression);
}

try (CloseableIterable<FileScanTask> files = scan.planFiles()) {
return OptionalLong.of(removeScanFiles(icebergTable, files));
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e);
}
return removeScanFiles(icebergTable, domainPredicate);
}

/**
* Deletes all the files within a particular scan
* Deletes all the files for a specific predicate
*
* @return the number of rows deleted from all files
*/
private long removeScanFiles(Table icebergTable, Iterable<FileScanTask> scan)
private OptionalLong removeScanFiles(Table icebergTable, TupleDomain<IcebergColumnHandle> predicate)
{
transaction = icebergTable.newTransaction();
DeleteFiles deletes = transaction.newDelete();
AtomicLong rowsDeleted = new AtomicLong(0L);
scan.forEach(t -> {
deletes.deleteFile(t.file());
rowsDeleted.addAndGet(t.estimatedRowsCount());
});
deletes.commit();
DeleteFiles deleteFiles = transaction.newDelete()
.deleteFromRowFilter(toIcebergExpression(predicate));
deleteFiles.commit();
transaction.commitTransaction();
return rowsDeleted.get();

Map<String, String> summary = icebergTable.currentSnapshot().summary();
long deletedRecords = Long.parseLong(summary.getOrDefault(DELETED_RECORDS_PROP, "0"));
long removedPositionDeletes = Long.parseLong(summary.getOrDefault(REMOVED_POS_DELETES_PROP, "0"));
long removedEqualityDeletes = Long.parseLong(summary.getOrDefault(REMOVED_EQ_DELETES_PROP, "0"));
// Removed rows count is inaccurate when existing equality delete files
return OptionalLong.of(deletedRecords - removedPositionDeletes - removedEqualityDeletes);
}

private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVersion tableVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
return columns.build();
}

public static Set<Integer> getPartitionSpecsIncludingValidData(Table icebergTable, Optional<Long> snapshotId)
{
return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles())
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.
}

public static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
Expand All @@ -71,11 +70,11 @@
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -177,12 +176,7 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
RowExpression subfieldPredicate = rowExpressionService.getDomainTranslator().toPredicate(subfieldTupleDomain);

// Get partition specs that really need to be checked
Set<Integer> partitionSpecIds = tableHandle.getIcebergTableName().getSnapshotId().map(
snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream()
.map(ManifestFile::partitionSpecId)
.collect(toImmutableSet()))
.orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter.

Set<Integer> partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, tableHandle.getIcebergTableName().getSnapshotId());
Set<IcebergColumnHandle> enforcedColumns = getEnforcedColumns(icebergTable,
partitionSpecIds,
entireColumnDomain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,58 @@ public void testDeleteOnPartitionedV1Table()
dropTable(session, tableName);
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode)
{
String tableName = "test_empty_partition_spec_table";
try {
// Create a table with no partition
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");

// Do not insert data, and evaluate the partition spec by adding a partition column `c`
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");

// Insert data under the new partition spec
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);

// We can do metadata delete on partition column `c`, because the initial partition spec contains no data
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
}
finally {
dropTable(getSession(), tableName);
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_data_deleted_partition_spec_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Do metadata delete on column `a`, because all partition specs contains partition column `a`
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);

// Then we can do metadata delete on column `c`, because the old partition spec contains no data now
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@DataProvider(name = "version_and_mode")
public Object[][] versionAndMode()
{
Expand Down
Loading