Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,33 @@
* Represents a manifest file that can be scanned to find data files in a table.
*/
public interface ManifestFile {
Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get());
Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get());
Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get());
Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get());
Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get());
Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get());
Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get());
Types.StructType PARTITION_SUMMARY_TYPE = Types.StructType.of(
required(509, "contains_null", Types.BooleanType.get()),
optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
optional(511, "upper_bound", Types.BinaryType.get())
);
Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE));
Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get());
Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get());
Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get());
Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get());
Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get());
// next ID to assign: 517

Schema SCHEMA = new Schema(
required(500, "manifest_path", Types.StringType.get()),
required(501, "manifest_length", Types.LongType.get()),
required(502, "partition_spec_id", Types.IntegerType.get()),
optional(503, "added_snapshot_id", Types.LongType.get()),
optional(504, "added_data_files_count", Types.IntegerType.get()),
optional(505, "existing_data_files_count", Types.IntegerType.get()),
optional(506, "deleted_data_files_count", Types.IntegerType.get()),
optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of(
required(509, "contains_null", Types.BooleanType.get()),
optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
optional(511, "upper_bound", Types.BinaryType.get())
))),
optional(512, "added_rows_count", Types.LongType.get()),
optional(513, "existing_rows_count", Types.LongType.get()),
optional(514, "deleted_rows_count", Types.LongType.get()));
PATH, LENGTH, SPEC_ID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we had issues with reordering fields in ManifestFile as GenericAvroWriter was using ordinal positions instead of field ids. Did we solve that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is handled in V1Metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the IndexedRecord field order needs to match the schema order. This is why I added a test for this as well.

SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
PARTITION_SUMMARIES);

static Schema schema() {
return SCHEMA;
Expand All @@ -66,6 +77,16 @@ static Schema schema() {
*/
int partitionSpecId();

/**
* @return the sequence number of the commit that added the manifest file
*/
long sequenceNumber();

/**
* @return the lowest sequence number of any data file in the manifest
*/
long minSequenceNumber();

/**
* @return ID of the snapshot that added the manifest file to table metadata
*/
Expand Down Expand Up @@ -152,14 +173,8 @@ default boolean hasDeletedFiles() {
* Summarizes the values of one partition field stored in a manifest file.
*/
interface PartitionFieldSummary {
Types.StructType TYPE = ManifestFile.schema()
.findType("partitions")
.asListType()
.elementType()
.asStructType();

static Types.StructType getType() {
return TYPE;
return PARTITION_SUMMARY_TYPE;
}

/**
Expand Down
10 changes: 10 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ public int partitionSpecId() {
return specId;
}

@Override
public long sequenceNumber() {
return 0;
}

@Override
public long minSequenceNumber() {
return 0;
}

@Override
public Long snapshotId() {
return snapshotId;
Expand Down
16 changes: 1 addition & 15 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -125,20 +124,7 @@ public Map<String, String> summary() {
public List<ManifestFile> manifests() {
if (manifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
try (CloseableIterable<ManifestFile> files = Avro.read(manifestList)
.rename("manifest_file", GenericManifestFile.class.getName())
.rename("partitions", GenericPartitionFieldSummary.class.getName())
.rename("r508", GenericPartitionFieldSummary.class.getName())
.classLoader(GenericManifestFile.class.getClassLoader())
.project(ManifestFile.schema())
.reuseContainers(false)
.build()) {

this.manifests = Lists.newLinkedList(files);

} catch (IOException e) {
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestList.location());
}
this.manifests = ManifestLists.read(manifestList);
}

return manifests;
Expand Down
89 changes: 49 additions & 40 deletions core/src/main/java/org/apache/iceberg/GenericManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public class GenericManifestFile
private String manifestPath = null;
private Long length = null;
private int specId = -1;
private long sequenceNumber = 0;
private long minSequenceNumber = 0;
private Long snapshotId = null;
private Integer addedFilesCount = null;
private Long addedRowsCount = null;
private Integer existingFilesCount = null;
private Long existingRowsCount = null;
private Integer deletedFilesCount = null;
private Long addedRowsCount = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are trying to match the new ordering of fields in ManifestFile. Earlier, we co-located ...FilesCount with ...RowsCount to match the ordering of methods in ManifestFile and args in constructors. Is this change intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it's likely that these are going to be null in some cases, like manifests that contain equality delete files. Instead of mixing null, non-null, null, non-null, etc. I think it's better to keep the probably-null columns colocated for compression.

private Long existingRowsCount = null;
private Long deletedRowsCount = null;
private List<PartitionFieldSummary> partitions = null;

Expand All @@ -61,10 +63,7 @@ public class GenericManifestFile
public GenericManifestFile(org.apache.avro.Schema avroSchema) {
this.avroSchema = avroSchema;

List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
.asNestedType()
.asStructType()
.fields();
List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema).asStructType().fields();
List<Types.NestedField> allFields = ManifestFile.schema().asStruct().fields();

this.fromProjectionPos = new int[fields.size()];
Expand All @@ -89,6 +88,8 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
this.manifestPath = file.location();
this.length = null; // lazily loaded from file
this.specId = specId;
this.sequenceNumber = 0;
this.minSequenceNumber = 0;
this.snapshotId = null;
this.addedFilesCount = null;
this.addedRowsCount = null;
Expand All @@ -100,32 +101,17 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
this.fromProjectionPos = null;
}

public GenericManifestFile(String path, long length, int specId, Long snapshotId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it was public, I don't think anyone is using it. Seems OK to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be fine. Classes in core are only semi-public and not part of the API. That module has stronger guarantees.

int addedFilesCount, int existingFilesCount, int deletedFilesCount,
List<PartitionFieldSummary> partitions) {
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
this.length = length;
this.specId = specId;
this.snapshotId = snapshotId;
this.addedFilesCount = addedFilesCount;
this.addedRowsCount = null;
this.existingFilesCount = existingFilesCount;
this.existingRowsCount = null;
this.deletedFilesCount = deletedFilesCount;
this.deletedRowsCount = null;
this.partitions = partitions;
this.fromProjectionPos = null;
}

public GenericManifestFile(String path, long length, int specId, Long snapshotId,
public GenericManifestFile(String path, long length, int specId,
long sequenceNumber, long minSequenceNumber, Long snapshotId,
int addedFilesCount, long addedRowsCount, int existingFilesCount,
long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
List<PartitionFieldSummary> partitions) {
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
this.length = length;
this.specId = specId;
this.sequenceNumber = sequenceNumber;
this.minSequenceNumber = minSequenceNumber;
this.snapshotId = snapshotId;
this.addedFilesCount = addedFilesCount;
this.addedRowsCount = addedRowsCount;
Expand All @@ -147,6 +133,8 @@ private GenericManifestFile(GenericManifestFile toCopy) {
this.manifestPath = toCopy.manifestPath;
this.length = toCopy.length;
this.specId = toCopy.specId;
this.sequenceNumber = toCopy.sequenceNumber;
this.minSequenceNumber = toCopy.minSequenceNumber;
this.snapshotId = toCopy.snapshotId;
this.addedFilesCount = toCopy.addedFilesCount;
this.addedRowsCount = toCopy.addedRowsCount;
Expand Down Expand Up @@ -192,6 +180,16 @@ public int partitionSpecId() {
return specId;
}

@Override
public long sequenceNumber() {
return sequenceNumber;
}

@Override
public long minSequenceNumber() {
return minSequenceNumber;
}

@Override
public Long snapshotId() {
return snapshotId;
Expand Down Expand Up @@ -257,21 +255,25 @@ public Object get(int i) {
case 2:
return specId;
case 3:
return snapshotId;
return sequenceNumber;
case 4:
return addedFilesCount;
return minSequenceNumber;
case 5:
return existingFilesCount;
return snapshotId;
case 6:
return deletedFilesCount;
return addedFilesCount;
case 7:
return partitions;
return existingFilesCount;
case 8:
return addedRowsCount;
return deletedFilesCount;
case 9:
return existingRowsCount;
return addedRowsCount;
case 10:
return existingRowsCount;
case 11:
return deletedRowsCount;
case 12:
return partitions;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand All @@ -297,29 +299,35 @@ public <T> void set(int i, T value) {
this.specId = (Integer) value;
return;
case 3:
this.snapshotId = (Long) value;
this.sequenceNumber = value != null ? (Long) value : 0;
return;
case 4:
this.addedFilesCount = (Integer) value;
this.minSequenceNumber = value != null ? (Long) value : 0;
return;
case 5:
this.existingFilesCount = (Integer) value;
this.snapshotId = (Long) value;
return;
case 6:
this.deletedFilesCount = (Integer) value;
this.addedFilesCount = (Integer) value;
return;
case 7:
this.partitions = (List<PartitionFieldSummary>) value;
this.existingFilesCount = (Integer) value;
return;
case 8:
this.addedRowsCount = (Long) value;
this.deletedFilesCount = (Integer) value;
return;
case 9:
this.existingRowsCount = (Long) value;
this.addedRowsCount = (Long) value;
return;
case 10:
this.existingRowsCount = (Long) value;
return;
case 11:
this.deletedRowsCount = (Long) value;
return;
case 12:
this.partitions = (List<PartitionFieldSummary>) value;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -385,7 +393,8 @@ private CopyBuilder(ManifestFile toCopy) {
this.manifestFile = new GenericManifestFile((GenericManifestFile) toCopy);
} else {
this.manifestFile = new GenericManifestFile(
toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.snapshotId(),
toCopy.path(), toCopy.length(), toCopy.partitionSpecId(),
toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(),
toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(),
toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(),
copyList(toCopy.partitions(), PartitionFieldSummary::copy));
Expand Down
Loading