diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 716398085945..176bbbd3293c 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -42,7 +42,10 @@ public interface ManifestFile {
required(509, "contains_null", Types.BooleanType.get()),
optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
optional(511, "upper_bound", Types.BinaryType.get())
- ))));
+ ))),
+ optional(512, "added_rows_count", Types.LongType.get()),
+ optional(513, "existing_rows_count", Types.LongType.get()),
+ optional(514, "deleted_rows_count", Types.LongType.get()));
static Schema schema() {
return SCHEMA;
@@ -82,6 +85,11 @@ default boolean hasAddedFiles() {
*/
Integer addedFilesCount();
+ /**
+ * @return the total number of rows in all data files with status ADDED in the manifest file
+ */
+ Long addedRowsCount();
+
/**
* Returns true if the manifest contains EXISTING entries or if the count is not known.
*
@@ -96,6 +104,11 @@ default boolean hasExistingFiles() {
*/
Integer existingFilesCount();
+ /**
+ * @return the total number of rows in all data files with status EXISTING in the manifest file
+ */
+ Long existingRowsCount();
+
/**
* Returns true if the manifest contains DELETED entries or if the count is not known.
*
@@ -110,6 +123,11 @@ default boolean hasDeletedFiles() {
*/
Integer deletedFilesCount();
+ /**
+ * @return the total number of rows in all data files with status DELETED in the manifest file
+ */
+ Long deletedRowsCount();
+
/**
* Returns a list of {@link PartitionFieldSummary partition field summaries}.
*
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index 8ed45988f3b9..5f5018110015 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -165,8 +165,11 @@ public static class TestManifestFile implements ManifestFile {
private final int specId;
private final Long snapshotId;
private final Integer addedFiles;
+ private final Long addedRows;
private final Integer existingFiles;
+ private final Long existingRows;
private final Integer deletedFiles;
+ private final Long deletedRows;
private final List partitions;
public TestManifestFile(String path, long length, int specId, Long snapshotId,
@@ -177,8 +180,28 @@ public TestManifestFile(String path, long length, int specId, Long snapshotId,
this.specId = specId;
this.snapshotId = snapshotId;
this.addedFiles = addedFiles;
+ this.addedRows = null;
this.existingFiles = existingFiles;
+ this.existingRows = null;
this.deletedFiles = deletedFiles;
+ this.deletedRows = null;
+ this.partitions = partitions;
+ }
+
+ public TestManifestFile(String path, long length, int specId, Long snapshotId,
+ Integer addedFiles, Long addedRows, Integer existingFiles,
+ Long existingRows, Integer deletedFiles, Long deletedRows,
+ List partitions) {
+ this.path = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFiles = addedFiles;
+ this.addedRows = addedRows;
+ this.existingFiles = existingFiles;
+ this.existingRows = existingRows;
+ this.deletedFiles = deletedFiles;
+ this.deletedRows = deletedRows;
this.partitions = partitions;
}
@@ -207,16 +230,31 @@ public Integer addedFilesCount() {
return addedFiles;
}
+ @Override
+ public Long addedRowsCount() {
+ return addedRows;
+ }
+
@Override
public Integer existingFilesCount() {
return existingFiles;
}
+ @Override
+ public Long existingRowsCount() {
+ return existingRows;
+ }
+
@Override
public Integer deletedFilesCount() {
return deletedFiles;
}
+ @Override
+ public Long deletedRowsCount() {
+ return deletedRows;
+ }
+
@Override
public List partitions() {
return partitions;
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index 54981217279d..b999baf52e6f 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -47,8 +47,11 @@ public class GenericManifestFile
private int specId = -1;
private Long snapshotId = null;
private Integer addedFilesCount = null;
+ private Long addedRowsCount = null;
private Integer existingFilesCount = null;
+ private Long existingRowsCount = null;
private Integer deletedFilesCount = null;
+ private Long deletedRowsCount = null;
private List partitions = null;
/**
@@ -87,8 +90,11 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
this.specId = specId;
this.snapshotId = null;
this.addedFilesCount = null;
+ this.addedRowsCount = null;
this.existingFilesCount = null;
+ this.existingRowsCount = null;
this.deletedFilesCount = null;
+ this.deletedRowsCount = null;
this.partitions = null;
this.fromProjectionPos = null;
}
@@ -102,8 +108,30 @@ public GenericManifestFile(String path, long length, int specId, long snapshotId
this.specId = specId;
this.snapshotId = snapshotId;
this.addedFilesCount = addedFilesCount;
+ this.addedRowsCount = null;
this.existingFilesCount = existingFilesCount;
+ this.existingRowsCount = null;
this.deletedFilesCount = deletedFilesCount;
+ this.deletedRowsCount = null;
+ this.partitions = partitions;
+ this.fromProjectionPos = null;
+ }
+
+ public GenericManifestFile(String path, long length, int specId, long snapshotId,
+ int addedFilesCount, long addedRowsCount, int existingFilesCount,
+ long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
+ List partitions) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.manifestPath = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFilesCount = addedFilesCount;
+ this.addedRowsCount = addedRowsCount;
+ this.existingFilesCount = existingFilesCount;
+ this.existingRowsCount = existingRowsCount;
+ this.deletedFilesCount = deletedFilesCount;
+ this.deletedRowsCount = deletedRowsCount;
this.partitions = partitions;
this.fromProjectionPos = null;
}
@@ -120,8 +148,11 @@ private GenericManifestFile(GenericManifestFile toCopy) {
this.specId = toCopy.specId;
this.snapshotId = toCopy.snapshotId;
this.addedFilesCount = toCopy.addedFilesCount;
+ this.addedRowsCount = toCopy.addedRowsCount;
this.existingFilesCount = toCopy.existingFilesCount;
+ this.existingRowsCount = toCopy.existingRowsCount;
this.deletedFilesCount = toCopy.deletedFilesCount;
+ this.deletedRowsCount = toCopy.deletedRowsCount;
this.partitions = ImmutableList.copyOf(Iterables.transform(toCopy.partitions, PartitionFieldSummary::copy));
this.fromProjectionPos = toCopy.fromProjectionPos;
}
@@ -170,16 +201,31 @@ public Integer addedFilesCount() {
return addedFilesCount;
}
+ @Override
+ public Long addedRowsCount() {
+ return addedRowsCount;
+ }
+
@Override
public Integer existingFilesCount() {
return existingFilesCount;
}
+ @Override
+ public Long existingRowsCount() {
+ return existingRowsCount;
+ }
+
@Override
public Integer deletedFilesCount() {
return deletedFilesCount;
}
+ @Override
+ public Long deletedRowsCount() {
+ return deletedRowsCount;
+ }
+
@Override
public List partitions() {
return partitions;
@@ -219,6 +265,12 @@ public Object get(int i) {
return deletedFilesCount;
case 7:
return partitions;
+ case 8:
+ return addedRowsCount;
+ case 9:
+ return existingRowsCount;
+ case 10:
+ return deletedRowsCount;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
@@ -258,6 +310,15 @@ public void set(int i, T value) {
case 7:
this.partitions = (List) value;
return;
+ case 8:
+ this.addedRowsCount = (Long) value;
+ return;
+ case 9:
+ this.existingRowsCount = (Long) value;
+ return;
+ case 10:
+ this.deletedRowsCount = (Long) value;
+ return;
default:
// ignore the object, it must be from a newer version of the format
}
@@ -302,8 +363,11 @@ public String toString() {
.add("partition_spec_id", specId)
.add("added_snapshot_id", snapshotId)
.add("added_data_files_count", addedFilesCount)
+ .add("added_rows_count", addedRowsCount)
.add("existing_data_files_count", existingFilesCount)
+ .add("existing_rows_count", existingRowsCount)
.add("deleted_data_files_count", deletedFilesCount)
+ .add("deleted_rows_count", deletedRowsCount)
.add("partitions", partitions)
.toString();
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index e7441e6aafc8..6facccfbc3cd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -105,8 +105,11 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
private boolean closed = false;
private int addedFiles = 0;
+ private long addedRows = 0L;
private int existingFiles = 0;
+ private long existingRows = 0L;
private int deletedFiles = 0;
+ private long deletedRows = 0L;
ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
this.file = file;
@@ -121,12 +124,15 @@ void addEntry(ManifestEntry entry) {
switch (entry.status()) {
case ADDED:
addedFiles += 1;
+ addedRows += entry.file().recordCount();
break;
case EXISTING:
existingFiles += 1;
+ existingRows += entry.file().recordCount();
break;
case DELETED:
deletedFiles += 1;
+ deletedRows += entry.file().recordCount();
break;
}
stats.update(entry.file().partition());
@@ -195,7 +201,7 @@ public long length() {
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
return new GenericManifestFile(file.location(), writer.length(), specId, snapshotId,
- addedFiles, existingFiles, deletedFiles, stats.summaries());
+ addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index a37efe9cacc0..3aab6d33327a 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -312,8 +312,11 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife
ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
int addedFiles = 0;
+ long addedRows = 0L;
int existingFiles = 0;
+ long existingRows = 0L;
int deletedFiles = 0;
+ long deletedRows = 0L;
Long snapshotId = null;
long maxSnapshotId = Long.MIN_VALUE;
@@ -325,15 +328,18 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife
switch (entry.status()) {
case ADDED:
addedFiles += 1;
+ addedRows += entry.file().recordCount();
if (snapshotId == null) {
snapshotId = entry.snapshotId();
}
break;
case EXISTING:
existingFiles += 1;
+ existingRows += entry.file().recordCount();
break;
case DELETED:
deletedFiles += 1;
+ deletedRows += entry.file().recordCount();
if (snapshotId == null) {
snapshotId = entry.snapshotId();
}
@@ -349,7 +355,8 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife
}
return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
- snapshotId, addedFiles, existingFiles, deletedFiles, stats.summaries());
+ snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows,
+ stats.summaries());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
diff --git a/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java b/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java
new file mode 100644
index 000000000000..f50b8505d3ce
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestGenericManifestFile.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestGenericManifestFile {
+
+ private static final FileIO FILE_IO = new TestTables.LocalFileIO();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testManifestsWithoutRowStats() throws IOException {
+ File manifestListFile = temp.newFile("manifest-list.avro");
+ Assert.assertTrue(manifestListFile.delete());
+
+ Collection columnNamesWithoutRowStats = ImmutableList.of(
+ "manifest_path", "manifest_length", "partition_spec_id", "added_snapshot_id",
+ "added_data_files_count", "existing_data_files_count", "deleted_data_files_count",
+ "partitions");
+ Schema schemaWithoutRowStats = ManifestFile.schema().select(columnNamesWithoutRowStats);
+
+ OutputFile outputFile = FILE_IO.newOutputFile(manifestListFile.getCanonicalPath());
+ try (FileAppender appender = Avro.write(outputFile)
+ .schema(schemaWithoutRowStats)
+ .named("manifest_file")
+ .overwrite()
+ .build()) {
+
+ appender.add(new GenericManifestFile("path/to/manifest.avro", 1024, 1, 100L, 2, 3, 4, ImmutableList.of()));
+ }
+
+ InputFile inputFile = FILE_IO.newInputFile(manifestListFile.getCanonicalPath());
+ try (CloseableIterable files = Avro.read(inputFile)
+ .rename("manifest_file", GenericManifestFile.class.getName())
+ .rename("partitions", GenericPartitionFieldSummary.class.getName())
+ .rename("r508", GenericPartitionFieldSummary.class.getName())
+ .project(ManifestFile.schema())
+ .reuseContainers(false)
+ .build()) {
+
+ ManifestFile manifest = Iterables.getOnlyElement(files);
+
+ Assert.assertTrue("Added files should be present", manifest.hasAddedFiles());
+ Assert.assertEquals("Added files count should match", 2, (int) manifest.addedFilesCount());
+ Assert.assertNull("Added rows count should be null", manifest.addedRowsCount());
+
+ Assert.assertTrue("Existing files should be present", manifest.hasExistingFiles());
+ Assert.assertEquals("Existing files count should match", 3, (int) manifest.existingFilesCount());
+ Assert.assertNull("Existing rows count should be null", manifest.existingRowsCount());
+
+ Assert.assertTrue("Deleted files should be present", manifest.hasDeletedFiles());
+ Assert.assertEquals("Deleted files count should match", 4, (int) manifest.deletedFilesCount());
+ Assert.assertNull("Deleted rows count should be null", manifest.deletedRowsCount());
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
new file mode 100644
index 000000000000..9a67ac7b45de
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestManifestWriter extends TableTestBase {
+
+ @Test
+ public void testManifestStats() throws IOException {
+ ManifestFile manifest = writeManifest(
+ "manifest.avro",
+ manifestEntry(Status.ADDED, 100L, newFile(10)),
+ manifestEntry(Status.ADDED, 100L, newFile(20)),
+ manifestEntry(Status.ADDED, 100L, newFile(5)),
+ manifestEntry(Status.ADDED, 100L, newFile(5)),
+ manifestEntry(Status.EXISTING, 100L, newFile(15)),
+ manifestEntry(Status.EXISTING, 100L, newFile(10)),
+ manifestEntry(Status.EXISTING, 100L, newFile(1)),
+ manifestEntry(Status.DELETED, 100L, newFile(5)),
+ manifestEntry(Status.DELETED, 100L, newFile(2)));
+
+ Assert.assertTrue("Added files should be present", manifest.hasAddedFiles());
+ Assert.assertEquals("Added files count should match", 4, (int) manifest.addedFilesCount());
+ Assert.assertEquals("Added rows count should match", 40L, (long) manifest.addedRowsCount());
+
+ Assert.assertTrue("Existing files should be present", manifest.hasExistingFiles());
+ Assert.assertEquals("Existing files count should match", 3, (int) manifest.existingFilesCount());
+ Assert.assertEquals("Existing rows count should match", 26L, (long) manifest.existingRowsCount());
+
+ Assert.assertTrue("Deleted files should be present", manifest.hasDeletedFiles());
+ Assert.assertEquals("Deleted files count should match", 2, (int) manifest.deletedFilesCount());
+ Assert.assertEquals("Deleted rows count should match", 7L, (long) manifest.deletedRowsCount());
+ }
+
+ private DataFile newFile(long recordCount) {
+ String fileName = UUID.randomUUID().toString();
+ return DataFiles.builder(SPEC)
+ .withPath("data_bucket=0/" + fileName + ".parquet")
+ .withFileSizeInBytes(1024)
+ .withRecordCount(recordCount)
+ .build();
+ }
+}
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 5458c1e6cd0f..d5279ffce777 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -454,10 +454,16 @@ object SparkTableUtil {
override def addedFilesCount: Integer = null
+ override def addedRowsCount(): java.lang.Long = null
+
override def existingFilesCount: Integer = null
+ override def existingRowsCount(): java.lang.Long = null
+
override def deletedFilesCount: Integer = null
+ override def deletedRowsCount(): java.lang.Long = null
+
override def partitions: java.util.List[ManifestFile.PartitionFieldSummary] = null
override def copy: ManifestFile = this