Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetricsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,37 @@ public class MetricsUtil {

private MetricsUtil() {}

/**
* Copies a metrics object without lower and upper bounds for given fields.
*
* @param excludedFieldIds field IDs for which the lower and upper bounds must be dropped
* @return a new metrics object without lower and upper bounds for given fields
*/
public static Metrics copyWithoutFieldBounds(Metrics metrics, Set<Integer> excludedFieldIds) {
return new Metrics(
metrics.recordCount(),
metrics.columnSizes(),
metrics.valueCounts(),
metrics.nullValueCounts(),
metrics.nanValueCounts(),
copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds),
copyWithoutKeys(metrics.upperBounds(), excludedFieldIds));
}

private static <K, V> Map<K, V> copyWithoutKeys(Map<K, V> map, Set<K> keys) {
if (map == null) {
return null;
}

Map<K, V> filteredMap = Maps.newHashMap(map);

for (K key : keys) {
filteredMap.remove(key);
}

return filteredMap.isEmpty() ? null : filteredMap;
}

/**
* Construct mapping relationship between column id to NaN value counts from input metrics and
* metrics config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
*/
package org.apache.iceberg.deletes;

import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.util.CharSequenceSet;

/**
Expand All @@ -40,6 +47,9 @@
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
private static final Set<Integer> SINGLE_REFERENCED_FILE_BOUNDS_ONLY =
ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());

private final FileAppender<StructLike> appender;
private final FileFormat format;
private final String location;
Expand Down Expand Up @@ -89,7 +99,7 @@ public void close() throws IOException {
.withEncryptionKeyMetadata(keyMetadata)
.withSplitOffsets(appender.splitOffsets())
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withMetrics(metrics())
.build();
}
}
Expand All @@ -107,4 +117,13 @@ public DeleteFile toDeleteFile() {
public DeleteWriteResult result() {
return new DeleteWriteResult(toDeleteFile(), referencedDataFiles());
}

private Metrics metrics() {
Metrics metrics = appender.metrics();
if (referencedDataFiles.size() > 1) {
return MetricsUtil.copyWithoutFieldBounds(metrics, SINGLE_REFERENCED_FILE_BOUNDS_ONLY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the idea is only to drop the bounds for _file and _pos? I guess that would mean that we keep data column ranges that might be used for filtering?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not aware of any engines that persist deleted rows but I opted in for an incremental change in behavior to be safe.

} else {
return metrics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,17 @@ public void testPositionDeleteWriter() throws IOException {
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();

if (fileFormat == FileFormat.AVRO) {
Assert.assertNull(deleteFile.lowerBounds());
Assert.assertNull(deleteFile.upperBounds());
} else {
Assert.assertEquals(1, referencedDataFiles.size());
Assert.assertEquals(2, deleteFile.lowerBounds().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also assert that referencedDataFiles.size() == 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a check.

Assert.assertTrue(deleteFile.lowerBounds().containsKey(DELETE_FILE_PATH.fieldId()));
Assert.assertEquals(2, deleteFile.upperBounds().size());
Assert.assertTrue(deleteFile.upperBounds().containsKey(DELETE_FILE_PATH.fieldId()));
}

// verify the written delete file
GenericRecord deleteRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
List<Record> expectedDeletes =
Expand Down Expand Up @@ -302,6 +313,53 @@ public void testPositionDeleteWriterWithRow() throws IOException {
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}

@Test
public void testPositionDeleteWriterMultipleDataFiles() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// write two data files
DataFile dataFile1 = writeData(writerFactory, dataRows, table.spec(), partition);
DataFile dataFile2 = writeData(writerFactory, dataRows, table.spec(), partition);

// write a position delete file referencing both
List<PositionDelete<T>> deletes =
ImmutableList.of(
positionDelete(dataFile1.path(), 0L, null),
positionDelete(dataFile1.path(), 2L, null),
positionDelete(dataFile2.path(), 4L, null));
Pair<DeleteFile, CharSequenceSet> result =
writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
CharSequenceSet referencedDataFiles = result.second();

// verify the written delete file has NO lower and upper bounds
Assert.assertEquals(2, referencedDataFiles.size());
Assert.assertNull(deleteFile.lowerBounds());
Assert.assertNull(deleteFile.upperBounds());

// commit the data and delete files
table
.newRowDelta()
.addRows(dataFile1)
.addRows(dataFile2)
.addDeletes(deleteFile)
.validateDataFilesExist(referencedDataFiles)
.validateDeletedFiles()
.commit();

// verify the delete file is applied correctly
List<T> expectedRows =
ImmutableList.of(
toRow(2, "aaa"),
toRow(4, "aaa"),
toRow(5, "aaa"),
toRow(1, "aaa"),
toRow(2, "aaa"),
toRow(3, "aaa"),
toRow(4, "aaa"));
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}

private DataFile writeData(
FileWriterFactory<T> writerFactory, List<T> rows, PartitionSpec spec, StructLike partitionKey)
throws IOException {
Expand Down
38 changes: 38 additions & 0 deletions data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,44 @@ public void testPositionDeleteMetrics() throws IOException {
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}

@Test
public void testPositionDeleteMetricsCoveringMultipleDataFiles() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table);
EncryptedOutputFile outputFile = fileFactory.newOutputFile();
PositionDeleteWriter<T> deleteWriter =
writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);

try {
PositionDelete<T> positionDelete = PositionDelete.create();

positionDelete.set("File A", 1, toRow(3, "3", true, 3L));
deleteWriter.write(positionDelete);

positionDelete.set("File B", 1, toRow(3, "3", true, 3L));
deleteWriter.write(positionDelete);

} finally {
deleteWriter.close();
}

DeleteFile deleteFile = deleteWriter.toDeleteFile();

// should have NO bounds for path and position as the file covers multiple data paths
Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
Assert.assertEquals(2, lowerBounds.size());
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));

Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
Assert.assertEquals(2, upperBounds.size());
Assert.assertEquals(
3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
Assert.assertEquals(
3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}

@Test
public void testMaxColumns() throws IOException {
File tableDir = temp.newFolder();
Expand Down