diff --git a/api/src/main/java/org/apache/iceberg/ContentInfo.java b/api/src/main/java/org/apache/iceberg/ContentInfo.java new file mode 100644 index 000000000000..07ec8ee378a0 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ContentInfo.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; + +/** + * Metadata about externally stored content such as deletion vectors. + * + *

The deletion vector content is stored at the specified offset and size within the file + * referenced by {@link TrackedFile#location()}. + * + *

This struct must be defined when content_type is POSITION_DELETES, and must be null otherwise. + * + *

Note: For manifest-level deletion vectors (marking entries in a manifest as deleted), see + * {@link TrackedFile#manifestDV()} which stores the DV inline as a binary field. + */ +public interface ContentInfo { + Types.NestedField OFFSET = + Types.NestedField.required( + 144, "offset", Types.LongType.get(), "Offset in the file where the content starts"); + Types.NestedField SIZE_IN_BYTES = + Types.NestedField.required( + 145, + "size_in_bytes", + Types.LongType.get(), + "Length of the referenced content stored in the file"); + + static Types.StructType schema() { + return Types.StructType.of(OFFSET, SIZE_IN_BYTES); + } + + /** + * Returns the offset in the file where the deletion vector content starts. + * + *

The file location is specified in the {@link TrackedFile#location()} field. + */ + long offset(); + + /** Returns the size in bytes of the deletion vector content. */ + long sizeInBytes(); +} diff --git a/api/src/main/java/org/apache/iceberg/FileContent.java b/api/src/main/java/org/apache/iceberg/FileContent.java index 2c9a2fa51bd2..f700305d68ee 100644 --- a/api/src/main/java/org/apache/iceberg/FileContent.java +++ b/api/src/main/java/org/apache/iceberg/FileContent.java @@ -18,11 +18,21 @@ */ package org.apache.iceberg; -/** Content type stored in a file, one of DATA, POSITION_DELETES, or EQUALITY_DELETES. */ +/** + * Content type stored in a file. + * + *

For V1-V3 tables: DATA, POSITION_DELETES, or EQUALITY_DELETES. + * + *

For V4 tables: DATA, POSITION_DELETES, EQUALITY_DELETES, DATA_MANIFEST, or DELETE_MANIFEST. + */ public enum FileContent { DATA(0), POSITION_DELETES(1), - EQUALITY_DELETES(2); + EQUALITY_DELETES(2), + /** Data manifest entry (V4+ only) - references data files in a root manifest. */ + DATA_MANIFEST(3), + /** Delete manifest entry (V4+ only) - references delete files in a root manifest. */ + DELETE_MANIFEST(4); private final int id; diff --git a/api/src/main/java/org/apache/iceberg/ManifestStats.java b/api/src/main/java/org/apache/iceberg/ManifestStats.java new file mode 100644 index 000000000000..d81779dc4472 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestStats.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; + +/** + * Statistics for manifest entries in a V4 tracked file. + * + *

This encapsulates added/removed/existing files/row counts and min_sequence_number for a + * manifest. This must be defined when the content_type is a manifest (3 or 4), and null otherwise. + */ +public interface ManifestStats { + Types.NestedField ADDED_FILES_COUNT = + Types.NestedField.required( + 504, "added_files_count", Types.IntegerType.get(), "Number of files added"); + Types.NestedField EXISTING_FILES_COUNT = + Types.NestedField.required( + 505, "existing_files_count", Types.IntegerType.get(), "Number of existing files"); + Types.NestedField DELETED_FILES_COUNT = + Types.NestedField.required( + 506, "deleted_files_count", Types.IntegerType.get(), "Number of deleted files"); + Types.NestedField ADDED_ROWS_COUNT = + Types.NestedField.required( + 512, "added_rows_count", Types.LongType.get(), "Number of rows in added files"); + Types.NestedField EXISTING_ROWS_COUNT = + Types.NestedField.required( + 513, "existing_rows_count", Types.LongType.get(), "Number of rows in existing files"); + Types.NestedField DELETED_ROWS_COUNT = + Types.NestedField.required( + 514, "deleted_rows_count", Types.LongType.get(), "Number of rows in deleted files"); + Types.NestedField MIN_SEQUENCE_NUMBER = + Types.NestedField.required( + 516, + "min_sequence_number", + Types.LongType.get(), + "Minimum sequence number of files in this manifest"); + + static Types.StructType schema() { + return Types.StructType.of( + ADDED_FILES_COUNT, + EXISTING_FILES_COUNT, + DELETED_FILES_COUNT, + ADDED_ROWS_COUNT, + EXISTING_ROWS_COUNT, + DELETED_ROWS_COUNT, + MIN_SEQUENCE_NUMBER); + } + + /** Returns the number of files added by this manifest. */ + int addedFilesCount(); + + /** Returns the number of existing files referenced by this manifest. */ + int existingFilesCount(); + + /** Returns the number of deleted files in this manifest. */ + int deletedFilesCount(); + + /** Returns the number of rows in added files. */ + long addedRowsCount(); + + /** Returns the number of rows in existing files. */ + long existingRowsCount(); + + /** Returns the number of rows in deleted files. */ + long deletedRowsCount(); + + /** Returns the minimum sequence number of files in this manifest. */ + long minSequenceNumber(); +} diff --git a/api/src/main/java/org/apache/iceberg/TrackedFile.java b/api/src/main/java/org/apache/iceberg/TrackedFile.java new file mode 100644 index 000000000000..4f35f7e4669f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/TrackedFile.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; +import org.apache.iceberg.types.Types; + +/** + * Represents a V4 content entry in a manifest file. + * + *

TrackedFile is the V4 equivalent of ContentFile. It provides a unified representation for all + * entry types in a V4 manifest: data files, delete files, manifests, and deletion vectors. + */ +public interface TrackedFile { + // Field IDs from V4 specification + Types.NestedField TRACKING_INFO = + Types.NestedField.required( + 147, "tracking_info", TrackingInfo.schema(), "Tracking information for this entry"); + Types.NestedField CONTENT_TYPE = + Types.NestedField.required( + 134, + "content_type", + Types.IntegerType.get(), + "Type of content: 0=DATA, 1=POSITION_DELETES, 2=EQUALITY_DELETES, 3=DATA_MANIFEST, 4=DELETE_MANIFEST"); + Types.NestedField LOCATION = + Types.NestedField.required(100, "location", Types.StringType.get(), "Location of the file"); + Types.NestedField FILE_FORMAT = + Types.NestedField.required( + 101, + "file_format", + Types.StringType.get(), + "String file format name: avro, orc, parquet, or puffin"); + Types.NestedField PARTITION_SPEC_ID = + Types.NestedField.required( + 149, + "partition_spec_id", + Types.IntegerType.get(), + "ID of partition spec used to write manifest or data/delete files"); + Types.NestedField SORT_ORDER_ID = + Types.NestedField.optional( + 140, + "sort_order_id", + Types.IntegerType.get(), + "ID representing sort order for this file. Can only be set if content_type is 0"); + Types.NestedField RECORD_COUNT = + Types.NestedField.required( + 103, + "record_count", + Types.LongType.get(), + "Number of records in this file, or the cardinality of a deletion vector"); + Types.NestedField FILE_SIZE_IN_BYTES = + Types.NestedField.optional( + 104, "file_size_in_bytes", Types.LongType.get(), "Total file size in bytes."); + Types.NestedField CONTENT_STATS = + Types.NestedField.optional( + 146, + "content_stats", + Types.StructType.of(), // TODO: Define ContentStats structure per V4 proposal + "Content statistics for this entry"); + Types.NestedField KEY_METADATA = + Types.NestedField.optional( + 131, + "key_metadata", + Types.BinaryType.get(), + "Implementation-specific key metadata for encryption"); + Types.NestedField SPLIT_OFFSETS = + Types.NestedField.optional( + 132, + "split_offsets", + Types.ListType.ofRequired(133, Types.LongType.get()), + "Split offsets for the data file. Must be sorted ascending"); + Types.NestedField CONTENT_INFO = + Types.NestedField.optional( + 148, + "content_info", + ContentInfo.schema(), + "Content info. Required when content_type is POSITION_DELETES, must be null otherwise"); + Types.NestedField EQUALITY_IDS = + Types.NestedField.optional( + 135, + "equality_ids", + Types.ListType.ofRequired(136, Types.IntegerType.get()), + "Field ids used to determine row equality in equality delete files. Required when content=2"); + Types.NestedField REFERENCED_FILE = + Types.NestedField.optional( + 143, + "referenced_file", + Types.StringType.get(), + "Location of data file that a DV references if content_type is POSITION_DELETES. " + + "Location of affiliated data manifest if content_type is DELETE_MANIFEST, or null if unaffiliated"); + Types.NestedField MANIFEST_STATS = + Types.NestedField.optional( + 150, + "manifest_stats", + ManifestStats.schema(), + "Manifest statistics. Required for DATA_MANIFEST and DELETE_MANIFEST"); + Types.NestedField MANIFEST_DV = + Types.NestedField.optional( + 151, + "manifest_dv", + Types.BinaryType.get(), + "Serialized deletion vector marking deleted entry positions in the referenced manifest. " + + "Optional for DATA_MANIFEST and DELETE_MANIFEST, must be null for other content types"); + + /** + * Returns the path of the manifest which this file is referenced in or null if it was not read + * from a manifest. + */ + String manifestLocation(); + + /** + * Returns the tracking information for this entry. + * + *

Contains status, snapshot ID, sequence numbers, and first-row-id. Optional - may be null if + * tracking info is inherited. + */ + TrackingInfo trackingInfo(); + + /** + * Returns the type of content stored by this entry. + * + *

One of: DATA, POSITION_DELETES, EQUALITY_DELETES, DATA_MANIFEST, or DELETE_MANIFEST. + */ + FileContent contentType(); + + /** Returns the location of the file. */ + String location(); + + /** Returns the format of the file (avro, orc, parquet, or puffin). */ + FileFormat fileFormat(); + + /** + * Returns the content info. + * + *

Must be defined if content_type is POSITION_DELETES, must be null otherwise. + */ + ContentInfo contentInfo(); + + /** Returns the ID of the partition spec used to write this file or manifest. */ + int partitionSpecId(); + + /** + * Returns the ID representing sort order for this file. + * + *

Can only be set if content_type is DATA. + */ + Integer sortOrderId(); + + /** Returns the number of records in this file, or the cardinality of a deletion vector. */ + long recordCount(); + + /** + * Returns the total file size in bytes. + * + *

Must be defined if location is defined. + */ + Long fileSizeInBytes(); + + /** + * Returns the content stats for this entry. + * + *

TODO: Define ContentStats structure per V4 proposal. + */ + Object contentStats(); + + /** + * Returns the manifest stats for this entry. + * + *

Must be set if content_type is DATA_MANIFEST or DELETE_MANIFEST, otherwise must be null. + */ + ManifestStats manifestStats(); + + /** + * Returns the manifest deletion vector for this entry. + * + *

When present, this is a serialized deletion vector where each set bit position corresponds + * to an entry in the manifest that should be treated as deleted. This allows marking manifest + * entries as deleted without rewriting the manifest file. + * + *

Optional for DATA_MANIFEST and DELETE_MANIFEST content types, must be null otherwise. + */ + ByteBuffer manifestDV(); + + /** Returns metadata about how this file is encrypted, or null if stored in plain text. */ + ByteBuffer keyMetadata(); + + /** + * Returns list of recommended split locations, if applicable, null otherwise. + * + *

Must be sorted in ascending order. + */ + List splitOffsets(); + + /** + * Returns the set of field IDs used for equality comparison, in equality delete files. + * + *

Required when content_type is EQUALITY_DELETES, must be null otherwise. + */ + List equalityIds(); + + /** + * Returns the location of the referenced file. + * + *

For POSITION_DELETES: location of the data file that the deletion vector references. + * + *

For DELETE_MANIFEST: location of the affiliated data manifest, or null if unaffiliated. + */ + String referencedFile(); + + /** + * Copies this tracked file. + * + *

Manifest readers can reuse file instances; use this method to copy data when collecting + * files from tasks. + */ + TrackedFile copy(); + + /** + * Copies this tracked file without stats. + * + *

Use this method to copy data without stats when collecting files. + */ + TrackedFile copyWithoutStats(); + + /** + * Copies this tracked file with stats only for specific columns. + * + *

Manifest readers can reuse file instances; use this method to copy data with stats only for + * specific columns when collecting files. + * + * @param requestedColumnIds column IDs for which to keep stats + * @return a copy of this tracked file, with content stats for only the requested columns + */ + TrackedFile copyWithStats(Set requestedColumnIds); + + /** + * Returns the ordinal position in the manifest. + * + *

Used for applying manifest deletion vectors. + */ + Long pos(); + + /** + * Converts this tracked file to a DataFile. + * + *

Only valid when content_type is DATA. + * + * @param spec the partition spec for this file, used to interpret partition values + * @return a DataFile representation + * @throws IllegalStateException if content_type is not DATA + */ + DataFile asDataFile(PartitionSpec spec); + + /** + * Converts this tracked file to a DeleteFile. + * + *

Only valid when content_type is POSITION_DELETES or EQUALITY_DELETES. + * + * @param spec the partition spec for this file, used to interpret partition values + * @return a DeleteFile representation + * @throws IllegalStateException if content_type is not a delete type + */ + DeleteFile asDeleteFile(PartitionSpec spec); +} diff --git a/api/src/main/java/org/apache/iceberg/TrackingInfo.java b/api/src/main/java/org/apache/iceberg/TrackingInfo.java new file mode 100644 index 000000000000..ff2aaad487c9 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/TrackingInfo.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; + +/** + * Tracking information for a tracked file entry in a V4 manifest. + * + *

This groups the status, snapshot, and sequence number information for the entry. This enables + * accessing the fields for the entry and provides an isolated structure that can be modified. + */ +public interface TrackingInfo { + /** Status of an entry in a tracked file */ + enum Status { + EXISTING(0), + ADDED(1), + DELETED(2); + + private final int id; + + Status(int id) { + this.id = id; + } + + public int id() { + return id; + } + } + + Types.NestedField STATUS = + Types.NestedField.required( + 0, "status", Types.IntegerType.get(), "Entry status: 0=existing, 1=added, 2=deleted"); + Types.NestedField SNAPSHOT_ID = + Types.NestedField.optional( + 1, + "snapshot_id", + Types.LongType.get(), + "Snapshot ID where the file was added or deleted"); + Types.NestedField SEQUENCE_NUMBER = + Types.NestedField.optional( + 3, "sequence_number", Types.LongType.get(), "Data sequence number of the file"); + Types.NestedField FILE_SEQUENCE_NUMBER = + Types.NestedField.optional( + 4, + "file_sequence_number", + Types.LongType.get(), + "File sequence number indicating when the file was added"); + Types.NestedField FIRST_ROW_ID = + Types.NestedField.optional( + 142, "first_row_id", Types.LongType.get(), "ID of the first row in the data file"); + + static Types.StructType schema() { + return Types.StructType.of( + STATUS, SNAPSHOT_ID, SEQUENCE_NUMBER, FILE_SEQUENCE_NUMBER, FIRST_ROW_ID); + } + + /** + * Returns the status of the entry. + * + *

Status values: + * + *

+ */ + Status status(); + + /** Returns the snapshot ID where the file was added or deleted. */ + Long snapshotId(); + + /** Returns the data sequence number of the file. */ + Long dataSequenceNumber(); + + /** Returns the file sequence number indicating when the file was added. */ + Long fileSequenceNumber(); + + /** Returns the ID of the first row in the data file. */ + Long firstRowId(); + + /** Returns the path of the manifest which this entry was read from. */ + String manifestLocation(); + + /** Returns the ordinal position of this entry within the manifest. */ + long manifestPos(); +} diff --git a/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadata.java new file mode 100644 index 000000000000..f997549c2d96 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadata.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; + +/** + * Interface for applying inheritable metadata to tracked file entries in V4 manifests. + * + *

Similar to {@link InheritableMetadata} but for V4 TrackedFile entries. + */ +interface InheritableTrackedMetadata extends Serializable { + /** + * Apply inheritable metadata to a tracked file entry. + * + * @param entry the tracked file entry (mutable TrackedFileStruct) + * @return the entry with metadata applied + */ + TrackedFileStruct apply(TrackedFileStruct entry); +} diff --git a/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadataFactory.java new file mode 100644 index 000000000000..bd2cfac4fc5a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/InheritableTrackedMetadataFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Factory for creating {@link InheritableTrackedMetadata} instances. */ +class InheritableTrackedMetadataFactory { + + private InheritableTrackedMetadataFactory() {} + + /** + * Creates inheritable metadata from explicit values. + * + * @param snapshotId the snapshot ID + * @param sequenceNumber the sequence number + * @return inheritable metadata instance + */ + static InheritableTrackedMetadata create(long snapshotId, long sequenceNumber) { + return new BaseInheritableTrackedMetadata(snapshotId, sequenceNumber); + } + + /** + * Creates inheritable metadata from a tracked file representing a manifest (for reading leaf + * manifests). + * + * @param manifestEntry the DATA_MANIFEST or DELETE_MANIFEST tracked file from root + * @return inheritable metadata instance + */ + static InheritableTrackedMetadata fromTrackedFile(TrackedFile manifestEntry) { + Preconditions.checkArgument( + manifestEntry.contentType() == FileContent.DATA_MANIFEST + || manifestEntry.contentType() == FileContent.DELETE_MANIFEST, + "Can only create metadata from tracked files for manifests, got: %s", + manifestEntry.contentType()); + + TrackingInfo tracking = manifestEntry.trackingInfo(); + Preconditions.checkNotNull( + tracking, + "Manifest tracked file is missing tracking info and appears to be uncommitted: %s", + manifestEntry.location()); + + Long snapshotId = tracking.snapshotId(); + Long sequenceNumber = tracking.dataSequenceNumber(); + String manifestLocation = manifestEntry.location(); + + Preconditions.checkNotNull( + snapshotId, "Manifest tracked file must have snapshot ID: %s", manifestLocation); + + return new BaseInheritableTrackedMetadata( + snapshotId, sequenceNumber != null ? sequenceNumber : 0L, manifestLocation); + } + + static class BaseInheritableTrackedMetadata implements InheritableTrackedMetadata { + private final long snapshotId; + private final long sequenceNumber; + private final String manifestLocation; + + private BaseInheritableTrackedMetadata(long snapshotId, long sequenceNumber) { + this(snapshotId, sequenceNumber, null); + } + + private BaseInheritableTrackedMetadata( + long snapshotId, long sequenceNumber, String manifestLocation) { + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; + this.manifestLocation = manifestLocation; + } + + @Override + public TrackedFileStruct apply(TrackedFileStruct entry) { + TrackingInfo tracking = entry.trackingInfo(); + + if (tracking == null || tracking.snapshotId() == null) { + entry.ensureTrackingInfo().setSnapshotId(snapshotId); + } + + // in v1 tables, the sequence number is not persisted and can be safely defaulted to 0 + // in v2+ tables, the sequence number should be inherited iff the entry status is ADDED + if (tracking == null || tracking.dataSequenceNumber() == null) { + if (sequenceNumber == 0 + || (tracking != null && tracking.status() == TrackingInfo.Status.ADDED)) { + TrackedFileStruct.TrackingInfoStruct trackingInfo = entry.ensureTrackingInfo(); + trackingInfo.setSequenceNumber(sequenceNumber); + trackingInfo.setFileSequenceNumber(sequenceNumber); + } + } + + if (manifestLocation != null) { + entry.setManifestLocation(manifestLocation); + } + + return entry; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestExpander.java b/core/src/main/java/org/apache/iceberg/ManifestExpander.java new file mode 100644 index 000000000000..a71b6dbe2341 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestExpander.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ParallelIterable; + +/** + * Expands V4 root manifests and plans file scan tasks. + * + *

Root manifests can contain direct file entries (DATA, POSITION_DELETES, EQUALITY_DELETES) as + * well as references to leaf manifests (DATA_MANIFEST, DELETE_MANIFEST). Handles recursive + * expansion of manifest references and creates scan tasks, similar to ManifestGroup in V3. + * + *

Currently returns DataFileScanInfo (TrackedFile data + deletes). Full FileScanTask creation + * blocked on ContentStats implementation. + * + *

TODO: (after ContentStats is ready): + * + *

    + *
  1. Implement TrackedFile.asDataFile(spec) + *
  2. Implement TrackedFile.asDeleteFile(spec) + *
  3. Add planFiles() method that returns CloseableIterable<FileScanTask> + *
  4. Handle equality deletes + *
  5. Add manifest-level filtering using ManifestStats + *
+ */ +public class ManifestExpander extends CloseableGroup { + private final V4ManifestReader rootReader; + private final FileIO io; + private final Map specsById; + + private static final Types.StructType FILE_METADATA_TYPE = + Types.StructType.of( + TrackedFile.CONTENT_TYPE, + TrackedFile.RECORD_COUNT, + TrackedFile.FILE_SIZE_IN_BYTES, + TrackedFile.PARTITION_SPEC_ID, + TrackedFile.SORT_ORDER_ID); + + private Expression dataFilter; + private Expression fileFilter; + private Evaluator fileFilterEvaluator; + private Map> deletesByPath; + + private boolean ignoreDeleted; + + private boolean ignoreExisting; + + private Schema customProjection; + + private boolean caseSensitive; + + @SuppressWarnings("UnusedVariable") + private ScanMetrics scanMetrics; + + private ExecutorService executorService; + + public ManifestExpander( + V4ManifestReader rootReader, FileIO io, Map specsById) { + this.rootReader = rootReader; + this.io = io; + this.specsById = specsById; + this.dataFilter = alwaysTrue(); + this.fileFilter = alwaysTrue(); + this.ignoreDeleted = false; + this.ignoreExisting = false; + this.customProjection = null; + this.caseSensitive = true; + this.scanMetrics = ScanMetrics.noop(); + addCloseable(rootReader); + } + + private Map> buildDeleteIndex() { + if (deletesByPath != null) { + return deletesByPath; + } + + Map> index = Maps.newHashMap(); + + try (CloseableIterable allFiles = allTrackedFiles()) { + for (TrackedFile entry : allFiles) { + if (entry.contentType() == FileContent.POSITION_DELETES && entry.referencedFile() != null) { + index.computeIfAbsent(entry.referencedFile(), k -> Lists.newArrayList()).add(entry); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to build delete index"); + } + + this.deletesByPath = index; + return index; + } + + private List deleteFilesForDataFile(TrackedFile dataFile) { + Map> index = buildDeleteIndex(); + List deletes = index.get(dataFile.location()); + + if (deletes == null || deletes.isEmpty()) { + return Collections.emptyList(); + } + + TrackingInfo dataTracking = dataFile.trackingInfo(); + Long dataSeq = dataTracking != null ? dataTracking.dataSequenceNumber() : null; + + if (dataSeq == null) { + return deletes; + } + + List filtered = Lists.newArrayList(); + for (TrackedFile delete : deletes) { + TrackingInfo deleteTracking = delete.trackingInfo(); + Long deleteSeq = deleteTracking != null ? deleteTracking.dataSequenceNumber() : null; + + if (deleteSeq == null || deleteSeq <= dataSeq) { + filtered.add(delete); + } + } + + return filtered; + } + + public ManifestExpander filterData(Expression expression) { + this.dataFilter = Expressions.and(dataFilter, expression); + return this; + } + + public ManifestExpander filterFiles(Expression expression) { + this.fileFilter = Expressions.and(fileFilter, expression); + this.fileFilterEvaluator = null; + return this; + } + + public ManifestExpander ignoreDeleted() { + this.ignoreDeleted = true; + return this; + } + + public ManifestExpander ignoreExisting() { + this.ignoreExisting = true; + return this; + } + + /** + * Sets a custom projection schema for fine-grained control. + * + *

Use this for metadata table scans and other custom situations that need specific column + * projection. When set, this projection is passed to leaf manifest readers. + * + * @param projection custom schema to project + * @return this expander for method chaining + */ + public ManifestExpander project(Schema projection) { + this.customProjection = projection; + return this; + } + + public ManifestExpander caseSensitive(boolean isCaseSensitive) { + this.caseSensitive = isCaseSensitive; + return this; + } + + public ManifestExpander scanMetrics(ScanMetrics metrics) { + this.scanMetrics = metrics; + return this; + } + + public ManifestExpander planWith(ExecutorService newExecutorService) { + this.executorService = newExecutorService; + return this; + } + + /** + * Plans file scan tasks for table scans with delete matching. + * + *

This is the main entry point for query planning, similar to ManifestGroup.planFiles() in V3. + * + *

Filters and matching: + * + *

    + *
  • Entry-level: Filters by status (ignoreDeleted/ignoreExisting) + *
  • Content-type: Returns only DATA files + *
  • File-level: Filters by file metadata (record_count, file_size, etc.) + *
  • Delete matching: Path-based matching for POSITION_DELETES with referencedFile + *
+ * + *

Returns DataFileScanInfo which pairs each data file with matched delete files. + * + *

TODO: When ContentStats is implemented, add planFiles() method that returns + * CloseableIterable<FileScanTask>: + * + *

{@code
+   * public CloseableIterable planFiles() {
+   *     return CloseableIterable.transform(planDataFiles(), this::createFileScanTask);
+   * }
+   *
+   * private FileScanTask createFileScanTask(DataFileScanInfo scanInfo) {
+   *     TrackedFile tf = scanInfo.dataFile();
+   *     PartitionSpec spec = specsById.get(tf.partitionSpecId());
+   *
+   *     // TrackedFile.asDataFile(spec) extracts partition from contentStats internally
+   *     DataFile dataFile = tf.asDataFile(spec);
+   *
+   *     // Convert TrackedFile deletes → DeleteFile array
+   *     DeleteFile[] deleteFiles = convertDeleteFiles(scanInfo.deleteFiles(), spec);
+   *
+   *     // Build residual evaluator
+   *     ResidualEvaluator residuals = ResidualEvaluator.of(spec, dataFilter, caseSensitive);
+   *
+   *     // Create task
+   *     return new BaseFileScanTask(
+   *         dataFile,
+   *         deleteFiles,
+   *         SchemaParser.toJson(spec.schema()),
+   *         PartitionSpecParser.toJson(spec),
+   *         residuals);
+   * }
+   *
+   * private DeleteFile[] convertDeleteFiles(
+   *     List deleteTrackedFiles,
+   *     PartitionSpec spec) {
+   *     DeleteFile[] deleteFiles = new DeleteFile[deleteTrackedFiles.size()];
+   *     for (int i = 0; i < deleteTrackedFiles.size(); i++) {
+   *         // TrackedFile.asDeleteFile(spec) extracts partition from contentStats internally
+   *         deleteFiles[i] = deleteTrackedFiles.get(i).asDeleteFile(spec);
+   *     }
+   *     return deleteFiles;
+   * }
+   * }
+ * + *

TODO: When ContentStats is implemented: + * + *

    + *
  • Equality deletes (match by partition + equality fields) + *
+ * + * @return iterable of DataFileScanInfo (data file + matched deletes) + */ + public CloseableIterable planDataFiles() { + CloseableIterable allFiles = + CloseableIterable.concat(Lists.newArrayList(directFiles(), expandManifests())); + + allFiles = CloseableIterable.filter(allFiles, tf -> tf.contentType() == FileContent.DATA); + + allFiles = applyStatusFilters(allFiles); + + allFiles = applyFileFilter(allFiles); + + buildDeleteIndex(); + + return CloseableIterable.transform( + allFiles, + dataFile -> { + List deletes = deleteFilesForDataFile(dataFile); + return new DataFileScanInfo(dataFile, deletes); + }); + } + + /** + * Information needed to scan a data file with associated delete files. + * + *

This is the V4 equivalent of the information used to create FileScanTask in V3. When + * ContentStats is implemented, this will be converted to BaseFileScanTask. + * + *

Contains: + * + *

    + *
  • Data file (as TrackedFile) + *
  • Matched delete files (as TrackedFile list) + *
+ * + *

TODO: When ContentStats available, convert this to FileScanTask by: + * + *

    + *
  1. Call TrackedFile.asDataFile(spec) - extracts partition from contentStats internally + *
  2. Call TrackedFile.asDeleteFile(spec) for deletes - extracts partition internally + *
  3. Create BaseFileScanTask with DataFile, DeleteFile array, and residuals + *
+ */ + public static class DataFileScanInfo { + private final TrackedFile dataFile; + private final List deleteFiles; + + DataFileScanInfo(TrackedFile dataFile, List deleteFiles) { + this.dataFile = dataFile; + this.deleteFiles = deleteFiles; + } + + public TrackedFile dataFile() { + return dataFile; + } + + public List deleteFiles() { + return deleteFiles; + } + } + + /** + * Returns all TrackedFiles from the root manifest and all referenced leaf manifests. + * + *

This includes: + * + *

    + *
  • Direct file entries in root (DATA, POSITION_DELETES, EQUALITY_DELETES) + *
  • Files from expanded DATA_MANIFEST entries + *
  • Files from expanded DELETE_MANIFEST entries + *
+ * + *

Returns TrackedFile entries (not converted to DataFile/DeleteFile yet). + * + *

TODO: Add manifest DV support. + * + * @return iterable of all tracked files + */ + public CloseableIterable allTrackedFiles() { + return CloseableIterable.concat(Lists.newArrayList(directFiles(), expandManifests())); + } + + private CloseableIterable applyStatusFilters( + CloseableIterable entries) { + CloseableIterable filtered = entries; + + if (ignoreDeleted) { + filtered = + CloseableIterable.filter( + filtered, + tf -> { + TrackingInfo tracking = tf.trackingInfo(); + return tracking == null || tracking.status() != TrackingInfo.Status.DELETED; + }); + } + + if (ignoreExisting) { + filtered = + CloseableIterable.filter( + filtered, + tf -> { + TrackingInfo tracking = tf.trackingInfo(); + return tracking != null && tracking.status() != TrackingInfo.Status.EXISTING; + }); + } + + return filtered; + } + + private CloseableIterable applyFileFilter(CloseableIterable entries) { + if (fileFilter == null || fileFilter == alwaysTrue()) { + return entries; + } + + return CloseableIterable.filter(entries, this::evaluateFileFilter); + } + + private boolean evaluateFileFilter(TrackedFile file) { + if (fileFilterEvaluator == null) { + fileFilterEvaluator = new Evaluator(FILE_METADATA_TYPE, fileFilter, caseSensitive); + } + return fileFilterEvaluator.eval((StructLike) file); + } + + /** + * Returns direct file entries from the root manifest. + * + *

These are DATA, POSITION_DELETES, or EQUALITY_DELETES entries stored directly in the root + * manifest + */ + private CloseableIterable directFiles() { + return CloseableIterable.filter( + rootReader.liveFiles(), + tf -> + tf.contentType() == FileContent.DATA + || tf.contentType() == FileContent.POSITION_DELETES + || tf.contentType() == FileContent.EQUALITY_DELETES); + } + + /** + * Expands manifest references (DATA_MANIFEST and DELETE_MANIFEST) to their contained files. + * + *

Loads all root entries to identify manifests, then reads each leaf manifest. If an + * ExecutorService is provided via planWith(), manifests are read in parallel using + * ParallelIterable. Otherwise, manifests are read serially. + * + *

If a manifest entry has a manifest_dv field set, it is used to filter out deleted positions + * in the leaf manifest without rewriting the manifest file. + */ + private CloseableIterable expandManifests() { + List rootEntries = Lists.newArrayList(rootReader.liveFiles()); + + List> allLeafFiles = Lists.newArrayList(); + + for (TrackedFile manifestEntry : rootEntries) { + if (manifestEntry.contentType() == FileContent.DATA_MANIFEST + || manifestEntry.contentType() == FileContent.DELETE_MANIFEST) { + + V4ManifestReader leafReader = + V4ManifestReaders.readLeaf(manifestEntry, io, specsById) + .withManifestDV(manifestEntry.manifestDV()); + + if (customProjection != null) { + leafReader.project(customProjection); + } + + addCloseable(leafReader); + allLeafFiles.add(leafReader.liveFiles()); + } + } + + if (executorService != null) { + return new ParallelIterable<>(allLeafFiles, executorService); + } else { + return CloseableIterable.concat(allLeafFiles); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java new file mode 100644 index 000000000000..a2365535b68f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java @@ -0,0 +1,1242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.avro.SupportsIndexProjection; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ByteBuffers; + +/** Internal struct implementation of {@link TrackedFile} for V4 manifests. */ +class TrackedFileStruct extends SupportsIndexProjection + implements TrackedFile, StructLike, Serializable { + + private static final FileContent[] FILE_CONTENT_VALUES = FileContent.values(); + + // Reader-side metadata (not serialized) + private String manifestLocation = null; + private Long position = null; + + // Common file metadata + private FileContent contentType = FileContent.DATA; + private String location = null; + private FileFormat fileFormat = null; + private int partitionSpecId = -1; + private Integer sortOrderId = null; + private long recordCount = -1L; + private Long fileSizeInBytes = null; + private byte[] keyMetadata = null; + private long[] splitOffsets = null; + private int[] equalityIds = null; + private String referencedFile = null; + + // Nested structs + private TrackingInfoStruct trackingInfo = null; + private ContentInfoStruct contentInfo = null; + private ManifestStatsStruct manifestStats = null; + + // Manifest DV (for manifest entries - marks deleted entries without rewriting) + private byte[] manifestDV = null; + + // Content stats placeholder (TODO: implement ContentStats) + private Object contentStats = null; + + // Base type for writing manifests (excludes read-only metadata columns) + static final Types.StructType WRITE_TYPE = + Types.StructType.of( + TrackedFile.CONTENT_TYPE, + TrackedFile.LOCATION, + TrackedFile.FILE_FORMAT, + TrackedFile.PARTITION_SPEC_ID, + TrackedFile.SORT_ORDER_ID, + TrackedFile.RECORD_COUNT, + TrackedFile.FILE_SIZE_IN_BYTES, + TrackedFile.KEY_METADATA, + TrackedFile.SPLIT_OFFSETS, + TrackedFile.EQUALITY_IDS, + TrackedFile.REFERENCED_FILE, + TrackedFile.TRACKING_INFO, + TrackedFile.CONTENT_INFO, + TrackedFile.MANIFEST_STATS, + TrackedFile.MANIFEST_DV); + + // Base type that corresponds to positions for get/set + // Nested structs (tracking_info, content_info, manifest_stats) are embedded as struct fields + // MetadataColumns.ROW_POSITION is included to support safe position tracking during reads + static final Types.StructType BASE_TYPE = + Types.StructType.of( + TrackedFile.CONTENT_TYPE, + TrackedFile.LOCATION, + TrackedFile.FILE_FORMAT, + TrackedFile.PARTITION_SPEC_ID, + TrackedFile.SORT_ORDER_ID, + TrackedFile.RECORD_COUNT, + TrackedFile.FILE_SIZE_IN_BYTES, + TrackedFile.KEY_METADATA, + TrackedFile.SPLIT_OFFSETS, + TrackedFile.EQUALITY_IDS, + TrackedFile.REFERENCED_FILE, + TrackedFile.TRACKING_INFO, + TrackedFile.CONTENT_INFO, + TrackedFile.MANIFEST_STATS, + TrackedFile.MANIFEST_DV, + MetadataColumns.ROW_POSITION); + + /** Used by internal readers to instantiate this class with a projection schema. */ + TrackedFileStruct(Types.StructType projection) { + super(BASE_TYPE, projection); + } + + TrackedFileStruct() { + super(BASE_TYPE.fields().size()); // includes MetadataColumns.ROW_POSITION + } + + /** + * Copy constructor. + * + * @param toCopy a tracked file to copy + * @param requestedColumnIds column IDs for which to keep stats, or null for all stats + */ + private TrackedFileStruct(TrackedFileStruct toCopy, Set requestedColumnIds) { + super(toCopy); + + // Reader-side metadata + this.manifestLocation = toCopy.manifestLocation; + this.position = toCopy.position; + + // Common file metadata + this.contentType = toCopy.contentType; + this.location = toCopy.location; + this.fileFormat = toCopy.fileFormat; + this.partitionSpecId = toCopy.partitionSpecId; + this.sortOrderId = toCopy.sortOrderId; + this.recordCount = toCopy.recordCount; + this.fileSizeInBytes = toCopy.fileSizeInBytes; + this.keyMetadata = + toCopy.keyMetadata != null + ? Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length) + : null; + this.splitOffsets = + toCopy.splitOffsets != null + ? Arrays.copyOf(toCopy.splitOffsets, toCopy.splitOffsets.length) + : null; + this.equalityIds = + toCopy.equalityIds != null + ? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length) + : null; + this.referencedFile = toCopy.referencedFile; + + // Nested structs (always copy) + this.trackingInfo = + toCopy.trackingInfo != null ? new TrackingInfoStruct(toCopy.trackingInfo) : null; + this.contentInfo = + toCopy.contentInfo != null ? new ContentInfoStruct(toCopy.contentInfo) : null; + + // Manifest DV (always copy - not dependent on stats) + this.manifestDV = + toCopy.manifestDV != null + ? Arrays.copyOf(toCopy.manifestDV, toCopy.manifestDV.length) + : null; + + if (requestedColumnIds != null) { + this.manifestStats = + toCopy.manifestStats != null ? new ManifestStatsStruct(toCopy.manifestStats) : null; + // TODO: When ContentStats structure is implemented, filter stats to only requestedColumnIds + this.contentStats = toCopy.contentStats; + } + } + + @Override + public String manifestLocation() { + return manifestLocation; + } + + public void setManifestLocation(String manifestLocation) { + this.manifestLocation = manifestLocation; + } + + @Override + public TrackingInfo trackingInfo() { + return trackingInfo; + } + + public void setTrackingInfo(TrackingInfo info) { + this.trackingInfo = info != null ? new TrackingInfoStruct(info) : null; + } + + @Override + public Long pos() { + return position; + } + + void setPos(Long pos) { + this.position = pos; + } + + /** + * Returns the mutable tracking info, creating one if needed. + * + *

Use this to modify tracking info fields directly rather than through TrackedFileStruct. + */ + TrackingInfoStruct ensureTrackingInfo() { + if (trackingInfo == null) { + trackingInfo = new TrackingInfoStruct(); + } + return trackingInfo; + } + + @Override + public FileContent contentType() { + return contentType; + } + + public void setContentType(FileContent contentType) { + this.contentType = contentType; + } + + @Override + public String location() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + @Override + public FileFormat fileFormat() { + return fileFormat; + } + + public void setFileFormat(FileFormat fileFormat) { + this.fileFormat = fileFormat; + } + + @Override + public ContentInfo contentInfo() { + return contentInfo; + } + + public void setContentInfo(ContentInfo info) { + this.contentInfo = info != null ? new ContentInfoStruct(info) : null; + } + + @Override + public int partitionSpecId() { + return partitionSpecId; + } + + public void setPartitionSpecId(int partitionSpecId) { + this.partitionSpecId = partitionSpecId; + } + + @Override + public Integer sortOrderId() { + return sortOrderId; + } + + public void setSortOrderId(Integer sortOrderId) { + this.sortOrderId = sortOrderId; + } + + @Override + public long recordCount() { + return recordCount; + } + + public void setRecordCount(long recordCount) { + this.recordCount = recordCount; + } + + @Override + public Long fileSizeInBytes() { + return fileSizeInBytes; + } + + public void setFileSizeInBytes(Long fileSizeInBytes) { + this.fileSizeInBytes = fileSizeInBytes; + } + + @Override + public Object contentStats() { + return contentStats; + } + + @Override + public ManifestStats manifestStats() { + return manifestStats; + } + + public void setManifestStats(ManifestStats stats) { + this.manifestStats = stats != null ? new ManifestStatsStruct(stats) : null; + } + + @Override + public ByteBuffer manifestDV() { + return manifestDV != null ? ByteBuffer.wrap(manifestDV) : null; + } + + public void setManifestDV(ByteBuffer dv) { + this.manifestDV = ByteBuffers.toByteArray(dv); + } + + @Override + public ByteBuffer keyMetadata() { + return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null; + } + + public void setKeyMetadata(ByteBuffer keyMetadata) { + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + } + + @Override + public List splitOffsets() { + return ArrayUtil.toUnmodifiableLongList(splitOffsets); + } + + public void setSplitOffsets(List offsets) { + this.splitOffsets = ArrayUtil.toLongArray(offsets); + } + + @Override + public List equalityIds() { + return ArrayUtil.toUnmodifiableIntList(equalityIds); + } + + public void setEqualityIds(List ids) { + this.equalityIds = ArrayUtil.toIntArray(ids); + } + + @Override + public String referencedFile() { + return referencedFile; + } + + public void setReferencedFile(String referencedFile) { + this.referencedFile = referencedFile; + } + + @Override + public TrackedFile copy() { + return new TrackedFileStruct(this, Collections.emptySet()); + } + + @Override + public TrackedFile copyWithoutStats() { + return new TrackedFileStruct(this, null); + } + + @Override + public TrackedFile copyWithStats(Set requestedColumnIds) { + return new TrackedFileStruct(this, requestedColumnIds); + } + + @Override + protected void internalSet(int pos, T value) { + switch (pos) { + case 0: + this.contentType = value != null ? FILE_CONTENT_VALUES[(Integer) value] : null; + return; + case 1: + this.location = (String) value; + return; + case 2: + this.fileFormat = value != null ? FileFormat.fromString(value.toString()) : null; + return; + case 3: + this.partitionSpecId = value != null ? (Integer) value : -1; + return; + case 4: + this.sortOrderId = (Integer) value; + return; + case 5: + this.recordCount = value != null ? (Long) value : -1L; + return; + case 6: + this.fileSizeInBytes = (Long) value; + return; + case 7: + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + return; + case 8: + this.splitOffsets = ArrayUtil.toLongArray((List) value); + return; + case 9: + this.equalityIds = ArrayUtil.toIntArray((List) value); + return; + case 10: + this.referencedFile = (String) value; + return; + case 11: + this.trackingInfo = TrackingInfoStruct.fromStructLike((StructLike) value); + return; + case 12: + this.contentInfo = ContentInfoStruct.fromStructLike((StructLike) value); + return; + case 13: + this.manifestStats = ManifestStatsStruct.fromStructLike((StructLike) value); + return; + case 14: + this.manifestDV = ByteBuffers.toByteArray((ByteBuffer) value); + return; + case 15: + // MetadataColumns.ROW_POSITION - set the ordinal position in the manifest + this.position = (Long) value; + return; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + protected T internalGet(int pos, Class javaClass) { + return javaClass.cast(getByPos(pos)); + } + + public Object get(int pos) { + return getByPos(pos); + } + + private Object getByPos(int pos) { + switch (pos) { + case 0: + return contentType != null ? contentType.id() : null; + case 1: + return location; + case 2: + return fileFormat != null ? fileFormat.name().toLowerCase(Locale.ROOT) : null; + case 3: + return partitionSpecId; + case 4: + return sortOrderId; + case 5: + return recordCount; + case 6: + return fileSizeInBytes; + case 7: + return keyMetadata(); + case 8: + return splitOffsets(); + case 9: + return equalityIds(); + case 10: + return referencedFile; + case 11: + return trackingInfo; + case 12: + return contentInfo; + case 13: + return manifestStats; + case 14: + return manifestDV != null ? ByteBuffer.wrap(manifestDV) : null; + case 15: + // MetadataColumns.ROW_POSITION + return position; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public DataFile asDataFile(PartitionSpec spec) { + if (contentType != FileContent.DATA) { + throw new IllegalStateException( + "Cannot convert TrackedFile with content type " + contentType + " to DataFile"); + } + return new TrackedDataFile(this, spec); + } + + @Override + public DeleteFile asDeleteFile(PartitionSpec spec) { + if (contentType != FileContent.POSITION_DELETES + && contentType != FileContent.EQUALITY_DELETES) { + throw new IllegalStateException( + "Cannot convert TrackedFile with content type " + contentType + " to DeleteFile"); + } + return new TrackedDeleteFile(this, spec); + } + + /** Wrapper that presents a TrackedFile as a DataFile. */ + private static class TrackedDataFile implements DataFile { + private final TrackedFile trackedFile; + + @SuppressWarnings("UnusedVariable") + private final PartitionSpec spec; + + private TrackedDataFile(TrackedFile trackedFile, PartitionSpec spec) { + this.trackedFile = trackedFile; + this.spec = spec; + } + + @Override + public String manifestLocation() { + return trackedFile.manifestLocation(); + } + + @Override + public Long pos() { + return trackedFile.pos(); + } + + @Override + public int specId() { + return trackedFile.partitionSpecId(); + } + + @Override + public FileContent content() { + return trackedFile.contentType(); + } + + @Override + public CharSequence path() { + return trackedFile.location(); + } + + @Override + public String location() { + return trackedFile.location(); + } + + @Override + public FileFormat format() { + return trackedFile.fileFormat(); + } + + @Override + public StructLike partition() { + // TODO: Implement partition value adaptation from content stats - key for validating v4 + // partition design + return null; + } + + @Override + public long recordCount() { + return trackedFile.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return trackedFile.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map valueCounts() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map nullValueCounts() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map nanValueCounts() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map lowerBounds() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map upperBounds() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public ByteBuffer keyMetadata() { + return trackedFile.keyMetadata(); + } + + @Override + public List splitOffsets() { + return trackedFile.splitOffsets(); + } + + @Override + public List equalityFieldIds() { + return null; + } + + @Override + public Integer sortOrderId() { + return trackedFile.sortOrderId(); + } + + @Override + public Long dataSequenceNumber() { + TrackingInfo trackingInfo = trackedFile.trackingInfo(); + return trackingInfo != null ? trackingInfo.dataSequenceNumber() : null; + } + + @Override + public Long fileSequenceNumber() { + TrackingInfo trackingInfo = trackedFile.trackingInfo(); + return trackingInfo != null ? trackingInfo.fileSequenceNumber() : null; + } + + @Override + public Long firstRowId() { + TrackingInfo trackingInfo = trackedFile.trackingInfo(); + return trackingInfo != null ? trackingInfo.firstRowId() : null; + } + + @Override + public DataFile copy() { + return new TrackedDataFile(trackedFile.copy(), spec); + } + + @Override + public DataFile copyWithoutStats() { + return new TrackedDataFile(trackedFile.copyWithoutStats(), spec); + } + + @Override + public DataFile copyWithStats(Set requestedColumnIds) { + return new TrackedDataFile(trackedFile.copyWithStats(requestedColumnIds), spec); + } + } + + /** Wrapper that presents a TrackedFile as a DeleteFile. */ + private static class TrackedDeleteFile implements DeleteFile { + private final TrackedFile trackedFile; + + @SuppressWarnings("UnusedVariable") + private final PartitionSpec spec; + + private TrackedDeleteFile(TrackedFile trackedFile, PartitionSpec spec) { + this.trackedFile = trackedFile; + this.spec = spec; + } + + @Override + public String manifestLocation() { + return trackedFile.manifestLocation(); + } + + @Override + public Long pos() { + return trackedFile.pos(); + } + + @Override + public int specId() { + return trackedFile.partitionSpecId(); + } + + @Override + public FileContent content() { + return trackedFile.contentType(); + } + + @Override + public CharSequence path() { + return trackedFile.location(); + } + + @Override + public String location() { + return trackedFile.location(); + } + + @Override + public FileFormat format() { + return trackedFile.fileFormat(); + } + + @Override + public StructLike partition() { + // TODO: Implement partition value adaptation from content stats + return null; + } + + @Override + public long recordCount() { + return trackedFile.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return trackedFile.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map valueCounts() { + return null; + } + + @Override + public Map nullValueCounts() { + return null; + } + + @Override + public Map nanValueCounts() { + // TODO: Adapt from new column stats structure + return null; + } + + @Override + public Map lowerBounds() { + return null; + } + + @Override + public Map upperBounds() { + return null; + } + + @Override + public ByteBuffer keyMetadata() { + return trackedFile.keyMetadata(); + } + + @Override + public List splitOffsets() { + return trackedFile.splitOffsets(); + } + + @Override + public List equalityFieldIds() { + return trackedFile.equalityIds(); + } + + @Override + public Integer sortOrderId() { + return trackedFile.sortOrderId(); + } + + @Override + public Long dataSequenceNumber() { + TrackingInfo trackingInfo = trackedFile.trackingInfo(); + return trackingInfo != null ? trackingInfo.dataSequenceNumber() : null; + } + + @Override + public Long fileSequenceNumber() { + TrackingInfo trackingInfo = trackedFile.trackingInfo(); + return trackingInfo != null ? trackingInfo.fileSequenceNumber() : null; + } + + @Override + public DeleteFile copy() { + return new TrackedDeleteFile(trackedFile.copy(), spec); + } + + @Override + public DeleteFile copyWithoutStats() { + return new TrackedDeleteFile(trackedFile.copyWithoutStats(), spec); + } + + @Override + public DeleteFile copyWithStats(Set requestedColumnIds) { + return new TrackedDeleteFile(trackedFile.copyWithStats(requestedColumnIds), spec); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("content_type", contentType) + .add("location", location) + .add("file_format", fileFormat) + .add("partition_spec_id", partitionSpecId) + .add("record_count", recordCount) + .add("file_size_in_bytes", fileSizeInBytes) + .add("sort_order_id", sortOrderId) + .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") + .add("split_offsets", splitOffsets) + .add("equality_ids", equalityIds) + .add("referenced_file", referencedFile) + .add("tracking_info", trackingInfo) + .add("content_info", contentInfo) + .add("manifest_stats", manifestStats) + .add("manifest_dv", manifestDV == null ? "null" : "(present)") + .toString(); + } + + /** Mutable struct implementation of TrackingInfo. */ + static class TrackingInfoStruct implements TrackingInfo, StructLike, Serializable { + // Serialized fields + private TrackingInfo.Status status = null; + private Long snapshotId = null; + private Long sequenceNumber = null; + private Long fileSequenceNumber = null; + private Long firstRowId = null; + + // Reader-side metadata (not serialized) + private String manifestLocation = null; + private long manifestPos = -1; + + TrackingInfoStruct() {} + + TrackingInfoStruct(TrackingInfo toCopy) { + this.status = toCopy.status(); + this.snapshotId = toCopy.snapshotId(); + this.sequenceNumber = toCopy.dataSequenceNumber(); + this.fileSequenceNumber = toCopy.fileSequenceNumber(); + this.firstRowId = toCopy.firstRowId(); + this.manifestLocation = toCopy.manifestLocation(); + this.manifestPos = toCopy.manifestPos(); + } + + static TrackingInfoStruct fromStructLike(StructLike struct) { + if (struct == null) { + return null; + } + if (struct instanceof TrackingInfoStruct) { + return (TrackingInfoStruct) struct; + } + TrackingInfoStruct result = new TrackingInfoStruct(); + Integer statusId = struct.get(0, Integer.class); + result.status = statusId != null ? TrackingInfo.Status.values()[statusId] : null; + result.snapshotId = struct.get(1, Long.class); + result.sequenceNumber = struct.get(2, Long.class); + result.fileSequenceNumber = struct.get(3, Long.class); + result.firstRowId = struct.get(4, Long.class); + return result; + } + + @Override + public Status status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public Long dataSequenceNumber() { + return sequenceNumber; + } + + @Override + public Long fileSequenceNumber() { + return fileSequenceNumber; + } + + @Override + public Long firstRowId() { + return firstRowId; + } + + void setStatus(TrackingInfo.Status status) { + this.status = status; + } + + void setSnapshotId(Long snapshotId) { + this.snapshotId = snapshotId; + } + + void setSequenceNumber(Long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + void setFileSequenceNumber(Long fileSequenceNumber) { + this.fileSequenceNumber = fileSequenceNumber; + } + + void setFirstRowId(Long firstRowId) { + this.firstRowId = firstRowId; + } + + @Override + public String manifestLocation() { + return manifestLocation; + } + + void setManifestLocation(String manifestLocation) { + this.manifestLocation = manifestLocation; + } + + @Override + public long manifestPos() { + return manifestPos; + } + + void setManifestPos(long manifestPos) { + this.manifestPos = manifestPos; + } + + @Override + public int size() { + return 5; + } + + @Override + public T get(int pos, Class javaClass) { + Object value; + switch (pos) { + case 0: + value = status != null ? status.id() : null; + break; + case 1: + value = snapshotId; + break; + case 2: + value = sequenceNumber; + break; + case 3: + value = fileSequenceNumber; + break; + case 4: + value = firstRowId; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + return javaClass.cast(value); + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + this.status = value != null ? TrackingInfo.Status.values()[(Integer) value] : null; + break; + case 1: + this.snapshotId = (Long) value; + break; + case 2: + this.sequenceNumber = (Long) value; + break; + case 3: + this.fileSequenceNumber = (Long) value; + break; + case 4: + this.firstRowId = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("snapshot_id", snapshotId) + .add("sequence_number", sequenceNumber) + .add("file_sequence_number", fileSequenceNumber) + .add("first_row_id", firstRowId) + .add("manifest_location", manifestLocation) + .add("manifest_pos", manifestPos) + .toString(); + } + } + + /** Mutable struct implementation of ContentInfo. */ + static class ContentInfoStruct implements ContentInfo, StructLike, Serializable { + private long offset; + private long sizeInBytes; + + ContentInfoStruct() {} + + ContentInfoStruct(ContentInfo toCopy) { + this.offset = toCopy.offset(); + this.sizeInBytes = toCopy.sizeInBytes(); + } + + static ContentInfoStruct fromStructLike(StructLike struct) { + if (struct == null) { + return null; + } + if (struct instanceof ContentInfoStruct) { + return (ContentInfoStruct) struct; + } + ContentInfoStruct result = new ContentInfoStruct(); + result.offset = struct.get(0, Long.class); + result.sizeInBytes = struct.get(1, Long.class); + return result; + } + + @Override + public long offset() { + return offset; + } + + @Override + public long sizeInBytes() { + return sizeInBytes; + } + + void setOffset(long offset) { + this.offset = offset; + } + + void setSizeInBytes(long sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + + @Override + public int size() { + return 2; + } + + @Override + public T get(int pos, Class javaClass) { + Object value; + switch (pos) { + case 0: + value = offset; + break; + case 1: + value = sizeInBytes; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + return javaClass.cast(value); + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + this.offset = (Long) value; + break; + case 1: + this.sizeInBytes = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("offset", offset) + .add("size_in_bytes", sizeInBytes) + .toString(); + } + } + + /** Mutable struct implementation of ManifestStats. */ + static class ManifestStatsStruct implements ManifestStats, StructLike, Serializable { + private int addedFilesCount; + private int existingFilesCount; + private int deletedFilesCount; + private long addedRowsCount; + private long existingRowsCount; + private long deletedRowsCount; + private long minSequenceNumber; + + ManifestStatsStruct() {} + + ManifestStatsStruct(ManifestStats toCopy) { + this.addedFilesCount = toCopy.addedFilesCount(); + this.existingFilesCount = toCopy.existingFilesCount(); + this.deletedFilesCount = toCopy.deletedFilesCount(); + this.addedRowsCount = toCopy.addedRowsCount(); + this.existingRowsCount = toCopy.existingRowsCount(); + this.deletedRowsCount = toCopy.deletedRowsCount(); + this.minSequenceNumber = toCopy.minSequenceNumber(); + } + + static ManifestStatsStruct fromStructLike(StructLike struct) { + if (struct == null) { + return null; + } + if (struct instanceof ManifestStatsStruct) { + return (ManifestStatsStruct) struct; + } + ManifestStatsStruct result = new ManifestStatsStruct(); + result.addedFilesCount = struct.get(0, Integer.class); + result.existingFilesCount = struct.get(1, Integer.class); + result.deletedFilesCount = struct.get(2, Integer.class); + result.addedRowsCount = struct.get(3, Long.class); + result.existingRowsCount = struct.get(4, Long.class); + result.deletedRowsCount = struct.get(5, Long.class); + result.minSequenceNumber = struct.get(6, Long.class); + return result; + } + + @Override + public int addedFilesCount() { + return addedFilesCount; + } + + @Override + public int existingFilesCount() { + return existingFilesCount; + } + + @Override + public int deletedFilesCount() { + return deletedFilesCount; + } + + @Override + public long addedRowsCount() { + return addedRowsCount; + } + + @Override + public long existingRowsCount() { + return existingRowsCount; + } + + @Override + public long deletedRowsCount() { + return deletedRowsCount; + } + + @Override + public long minSequenceNumber() { + return minSequenceNumber; + } + + void setAddedFilesCount(int count) { + this.addedFilesCount = count; + } + + void setExistingFilesCount(int count) { + this.existingFilesCount = count; + } + + void setDeletedFilesCount(int count) { + this.deletedFilesCount = count; + } + + void setAddedRowsCount(long count) { + this.addedRowsCount = count; + } + + void setExistingRowsCount(long count) { + this.existingRowsCount = count; + } + + void setDeletedRowsCount(long count) { + this.deletedRowsCount = count; + } + + void setMinSequenceNumber(long minSeqNum) { + this.minSequenceNumber = minSeqNum; + } + + @Override + public int size() { + return 7; + } + + @Override + public T get(int pos, Class javaClass) { + Object value; + switch (pos) { + case 0: + value = addedFilesCount; + break; + case 1: + value = existingFilesCount; + break; + case 2: + value = deletedFilesCount; + break; + case 3: + value = addedRowsCount; + break; + case 4: + value = existingRowsCount; + break; + case 5: + value = deletedRowsCount; + break; + case 6: + value = minSequenceNumber; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + return javaClass.cast(value); + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + this.addedFilesCount = (Integer) value; + break; + case 1: + this.existingFilesCount = (Integer) value; + break; + case 2: + this.deletedFilesCount = (Integer) value; + break; + case 3: + this.addedRowsCount = (Long) value; + break; + case 4: + this.existingRowsCount = (Long) value; + break; + case 5: + this.deletedRowsCount = (Long) value; + break; + case 6: + this.minSequenceNumber = (Long) value; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("added_files_count", addedFilesCount) + .add("existing_files_count", existingFilesCount) + .add("deleted_files_count", deletedFilesCount) + .add("added_rows_count", addedRowsCount) + .add("existing_rows_count", existingRowsCount) + .add("deleted_rows_count", deletedRowsCount) + .add("min_sequence_number", minSequenceNumber) + .toString(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/V4ManifestReader.java b/core/src/main/java/org/apache/iceberg/V4ManifestReader.java new file mode 100644 index 000000000000..6bb92bfce18c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/V4ManifestReader.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.roaringbitmap.RoaringBitmap; + +/** + * Reader for V4 manifest files containing TrackedFile entries. + * + *

Supports reading both root manifests and leaf manifests. Returns TrackedFile entries which can + * represent data files, delete files, or manifest references. + * + *

Projection capabilities: + * + *

    + *
  • By default, all columns are projected + *
  • Use {@link #filterData(Expression)} for automatic filter-based stats projection + *
  • Use {@link #projectStats(Set)} for custom stats field projection (unioned with + * filter-based) + *
  • Use {@link #project(Schema)} for fine-grained projection in use cases like metadata table + * scans + *
+ */ +class V4ManifestReader extends CloseableGroup implements CloseableIterable { + + private final InputFile file; + private final FileFormat format; + private final InheritableTrackedMetadata inheritableMetadata; + private final Long manifestFirstRowId; + + private ByteBuffer manifestDV = null; + + @SuppressWarnings("UnusedVariable") + private Expression filter = Expressions.alwaysTrue(); + + @SuppressWarnings("UnusedVariable") + private Set statsFieldIds = null; + + private Schema customProjection = null; + + protected V4ManifestReader( + InputFile file, + FileFormat format, + InheritableTrackedMetadata inheritableMetadata, + Long manifestFirstRowId) { + this.file = file; + this.format = format; + this.inheritableMetadata = inheritableMetadata; + this.manifestFirstRowId = manifestFirstRowId; + } + + /** + * Sets the manifest deletion vector to filter entries. + * + * @param dv serialized deletion vector of deleted entry positions, or null for no filtering + */ + public V4ManifestReader withManifestDV(ByteBuffer dv) { + this.manifestDV = dv; + return this; + } + + /** + * Sets a filter expression for automatic stats projection. + * + *

Stats for columns referenced in the filter will be projected automatically. This is more + * efficient than projecting all stats when only a subset is needed for filtering. + * + * @param expr filter expression to push down + * @return this reader for method chaining + */ + public V4ManifestReader filterData(Expression expr) { + this.filter = expr; + return this; + } + + /** + * Sets custom stats field IDs to project. + * + *

These field IDs are unioned with any fields extracted from {@link #filterData(Expression)}. + * Use this when you need stats for specific columns beyond what the filter requires. + * + * @param fieldIds table schema field IDs for which to project stats + * @return this reader for method chaining + */ + public V4ManifestReader projectStats(Set fieldIds) { + this.statsFieldIds = fieldIds; + return this; + } + + /** + * Sets a custom projection schema for fine-grained control. + * + *

Use this for metadata table scans and other custom situations (like {@code + * ManifestFiles.readPaths}) that need specific column projection. When set, this takes precedence + * over filter-based and custom stats projection. + * + *

Note: The tracking_info struct is always projected regardless of the custom schema to ensure + * correctness. + * + * @param projection custom schema to project + * @return this reader for method chaining + */ + public V4ManifestReader project(Schema projection) { + this.customProjection = projection; + return this; + } + + public CloseableIterable allFiles() { + return open(); + } + + public CloseableIterable liveFiles() { + return filterLiveFiles(open()); + } + + /** + * Builds the projection schema based on configured projection options. + * + *

If a custom projection is set, it is used with tracking_info always included to ensure + * correctness. Otherwise, projects all columns by default. Filter-based stats projection will be + * implemented when content stats structure is finalized. + * + *

Always includes: + * + *

    + *
  • {@link TrackedFile#TRACKING_INFO} - required for correct first_row_id handling + *
  • {@link MetadataColumns#ROW_POSITION} - for safe position tracking (handles row group + * skipping) + *
+ */ + private Schema buildProjection() { + List fields = Lists.newArrayList(); + + if (customProjection != null) { + fields.addAll(customProjection.asStruct().fields()); + + // Always project tracking_info to ensure first_row_id and status are available. + // This prevents bugs when reading via metadata tables with partial projection. + if (customProjection.findField(TrackedFile.TRACKING_INFO.fieldId()) == null) { + fields.add(TrackedFile.TRACKING_INFO); + } + + // Always add ROW_POSITION for safe position tracking (handles row group skipping) + if (customProjection.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { + fields.add(MetadataColumns.ROW_POSITION); + } + } else { + // Default: project all columns including ROW_POSITION + // TODO: When content stats structure is implemented, union filter-based field IDs + // with statsFieldIds to project only required stats columns + fields.addAll(TrackedFileStruct.BASE_TYPE.fields()); + } + + return new Schema(fields); + } + + private CloseableIterable open() { + Schema projection = buildProjection(); + + CloseableIterable entries = + InternalData.read(format, file) + .project(projection) + .setRootType(TrackedFileStruct.class) + .build(); + + addCloseable(entries); + + CloseableIterable transformed = + CloseableIterable.transform(entries, inheritableMetadata::apply); + + transformed = CloseableIterable.transform(transformed, rowIdAssigner(manifestFirstRowId)); + + if (manifestDV != null) { + RoaringBitmap deletedPositions = deserializeManifestDV(manifestDV); + transformed = + CloseableIterable.filter( + transformed, + entry -> { + Long pos = entry.pos(); + Preconditions.checkNotNull( + pos, "Position should not be null when applying manifest deletion vector"); + return !deletedPositions.contains(pos.intValue()); + }); + } + + return CloseableIterable.transform(transformed, e -> e); + } + + private static RoaringBitmap deserializeManifestDV(ByteBuffer dv) { + byte[] bytes = ByteBuffers.toByteArray(dv); + + RoaringBitmap deletedPositions = new RoaringBitmap(); + try { + deletedPositions.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch (IOException e) { + throw new UncheckedIOException("Failed to deserialize manifest DV", e); + } + + return deletedPositions; + } + + private CloseableIterable filterLiveFiles(CloseableIterable files) { + return CloseableIterable.filter( + files, + entry -> { + TrackingInfo tracking = entry.trackingInfo(); + Preconditions.checkNotNull( + tracking, "Tracking info should not be null for committed data files"); + return tracking.status() != TrackingInfo.Status.DELETED; + }); + } + + private static Function rowIdAssigner(Long firstRowId) { + if (firstRowId == null) { + // Explicitly clear first_row_id for v2->v3/v4 migration. + // Passing null signals that all row IDs should be cleared, not preserved. + return entry -> { + if (entry.contentType() == FileContent.DATA) { + // tracking_info is always projected and should never be null for manifest entries. + // If null, fail fast with NPE rather than silently ignoring. + TrackingInfo tracking = entry.trackingInfo(); + if (tracking.firstRowId() != null) { + entry.ensureTrackingInfo().setFirstRowId(null); + } + } + + return entry; + }; + } + + return new Function<>() { + private long nextRowId = firstRowId; + + @Override + public TrackedFileStruct apply(TrackedFileStruct entry) { + if (entry.contentType() == FileContent.DATA) { + // tracking_info is always projected and should never be null for manifest entries. + // If null, fail fast with NPE rather than silently ignoring. + TrackingInfo tracking = entry.trackingInfo(); + if (tracking.status() != TrackingInfo.Status.DELETED && tracking.firstRowId() == null) { + entry.ensureTrackingInfo().setFirstRowId(nextRowId); + nextRowId = nextRowId + entry.recordCount(); + } + } + + return entry; + } + }; + } + + @Override + public CloseableIterator iterator() { + return CloseableIterable.transform(liveFiles(), TrackedFile::copy).iterator(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/V4ManifestReaders.java b/core/src/main/java/org/apache/iceberg/V4ManifestReaders.java new file mode 100644 index 000000000000..89c4b0735d95 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/V4ManifestReaders.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Map; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Factory methods for creating {@link V4ManifestReader} instances. */ +public class V4ManifestReaders { + + private V4ManifestReaders() {} + + /** + * Create a reader for a root manifest. + * + *

In v4, root manifests should be written with explicit {@code first_row_id} values, similar + * to how manifest lists work in v3. Row ID assignment only happens when reading leaf manifests + * via {@link #readLeaf}, not for root manifests. + * + * @param rootManifestPath path to the root manifest file + * @param io file IO for reading + * @param snapshotId snapshot ID for metadata inheritance + * @param sequenceNumber sequence number for metadata inheritance + * @return a V4ManifestReader for the root manifest + */ + public static V4ManifestReader readRoot( + String rootManifestPath, FileIO io, long snapshotId, long sequenceNumber) { + InputFile inputFile = io.newInputFile(rootManifestPath); + InheritableTrackedMetadata metadata = + InheritableTrackedMetadataFactory.create(snapshotId, sequenceNumber); + + // Root manifests have explicit first_row_id values; no assignment needed + return new V4ManifestReader( + inputFile, FileFormat.AVRO, metadata, /* manifestFirstRowId= */ null); + } + + /** + * Create a reader for a leaf manifest referenced from a root manifest. + * + *

Row ID assignment happens during leaf manifest reading. The {@code first_row_id} is + * extracted from the manifest entry's tracking info and used to assign row IDs to data files that + * don't already have them. + * + * @param manifestEntry the DATA_MANIFEST or DELETE_MANIFEST entry from root + * @param io file IO for reading + * @param specsById map of partition specs by ID (currently unused, reserved for future use) + * @return a V4ManifestReader for the leaf manifest + */ + public static V4ManifestReader readLeaf( + TrackedFile manifestEntry, FileIO io, Map specsById) { + Preconditions.checkArgument( + manifestEntry.contentType() == FileContent.DATA_MANIFEST + || manifestEntry.contentType() == FileContent.DELETE_MANIFEST, + "Can only read manifest entries, got: %s", + manifestEntry.contentType()); + + InputFile inputFile = io.newInputFile(manifestEntry.location()); + + InheritableTrackedMetadata metadata = + InheritableTrackedMetadataFactory.fromTrackedFile(manifestEntry); + + TrackingInfo tracking = manifestEntry.trackingInfo(); + Long firstRowId = tracking != null ? tracking.firstRowId() : null; + + return new V4ManifestReader(inputFile, FileFormat.AVRO, metadata, firstRowId); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestExpander.java b/core/src/test/java/org/apache/iceberg/TestManifestExpander.java new file mode 100644 index 000000000000..2ac684b161f4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestExpander.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.roaringbitmap.RoaringBitmap; + +public class TestManifestExpander { + + private final FileIO io = new InMemoryFileIO(); + private final Map specsById; + + private static final long SNAPSHOT_ID = 12345L; + private static final long SEQUENCE_NUMBER = 100L; + + @TempDir private Path temp; + + public TestManifestExpander() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + PartitionSpec spec = PartitionSpec.unpartitioned(); + this.specsById = ImmutableMap.of(spec.specId(), spec); + } + + @Test + public void testRootWithOnlyDirectFiles() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + file1.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L); + file2.ensureTrackingInfo().setStatus(TrackingInfo.Status.EXISTING); + + String rootPath = writeRootManifest(file1, file2); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(2); + assertThat(allFiles.get(0).contentType()).isEqualTo(FileContent.DATA); + assertThat(allFiles.get(1).contentType()).isEqualTo(FileContent.DATA); + } + + @Test + public void testRootWithOnlyManifests() throws IOException { + TrackedFileStruct dataFile1 = createDataFile("data1.parquet", 1000L); + TrackedFileStruct dataFile2 = createDataFile("data2.parquet", 2000L); + String dataManifestPath = writeLeafManifest(dataFile1, dataFile2); + + TrackedFileStruct manifestEntry = createManifestEntry(dataManifestPath, 2, 2000L); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(2); + assertThat(allFiles.get(0).location()).endsWith("data1.parquet"); + assertThat(allFiles.get(1).location()).endsWith("data2.parquet"); + } + + @Test + public void testRootWithMixedDirectAndManifests() throws IOException { + TrackedFileStruct directFile = createDataFile("direct.parquet", 500L); + directFile.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct leafFile1 = createDataFile("leaf1.parquet", 1000L); + TrackedFileStruct leafFile2 = createDataFile("leaf2.parquet", 2000L); + String leafManifestPath = writeLeafManifest(leafFile1, leafFile2); + + TrackedFileStruct manifestEntry = createManifestEntry(leafManifestPath, 2, 3000L); + + String rootPath = writeRootManifest(directFile, manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(3); + + List locations = Lists.newArrayList(); + for (TrackedFile file : allFiles) { + locations.add(file.location()); + } + + assertThat(locations) + .anyMatch(loc -> loc.endsWith("direct.parquet")) + .anyMatch(loc -> loc.endsWith("leaf1.parquet")) + .anyMatch(loc -> loc.endsWith("leaf2.parquet")); + } + + @Test + public void testMultipleDataManifests() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L); + String manifest1Path = writeLeafManifest(file1, file2); + + TrackedFileStruct file3 = createDataFile("file3.parquet", 3000L); + String manifest2Path = writeLeafManifest(file3); + + TrackedFileStruct manifestEntry1 = createManifestEntry(manifest1Path, 2, 3000L); + TrackedFileStruct manifestEntry2 = createManifestEntry(manifest2Path, 1, 3000L); + + String rootPath = writeRootManifest(manifestEntry1, manifestEntry2); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(3); + assertThat(allFiles).allMatch(tf -> tf.contentType() == FileContent.DATA); + } + + @Test + public void testDeleteManifests() throws IOException { + TrackedFileStruct deleteFile1 = createDeleteFile("delete1.parquet", 100L); + TrackedFileStruct deleteFile2 = createDeleteFile("delete2.parquet", 200L); + String deleteManifestPath = writeLeafManifest(deleteFile1, deleteFile2); + + TrackedFileStruct manifestEntry = createDeleteManifestEntry(deleteManifestPath, 2, 300L); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(2); + assertThat(allFiles).allMatch(tf -> tf.contentType() == FileContent.POSITION_DELETES); + } + + @Test + public void testMixedDataAndDeleteManifests() throws IOException { + TrackedFileStruct dataFile = createDataFile("data.parquet", 1000L); + String dataManifestPath = writeLeafManifest(dataFile); + + TrackedFileStruct deleteFile = createDeleteFile("delete.parquet", 100L); + String deleteManifestPath = writeLeafManifest(deleteFile); + + TrackedFileStruct dataManifestEntry = createManifestEntry(dataManifestPath, 1, 1000L); + TrackedFileStruct deleteManifestEntry = createDeleteManifestEntry(deleteManifestPath, 1, 100L); + + String rootPath = writeRootManifest(dataManifestEntry, deleteManifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander reader = new ManifestExpander(rootReader, io, specsById); + + List allFiles = Lists.newArrayList(reader.allTrackedFiles()); + + assertThat(allFiles).hasSize(2); + + long dataFiles = allFiles.stream().filter(tf -> tf.contentType() == FileContent.DATA).count(); + long deleteFiles = + allFiles.stream().filter(tf -> tf.contentType() == FileContent.POSITION_DELETES).count(); + + assertThat(dataFiles).isEqualTo(1); + assertThat(deleteFiles).isEqualTo(1); + } + + @Test + public void testPlanDataFilesOnlyReturnsData() throws IOException { + TrackedFileStruct dataFile = createDataFile("data.parquet", 1000L); + TrackedFileStruct deleteFile = createDeleteFile("delete.parquet", 100L); + + String rootPath = writeRootManifest(dataFile, deleteFile); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).dataFile().contentType()).isEqualTo(FileContent.DATA); + assertThat(scanInfos.get(0).dataFile().location()).endsWith("data.parquet"); + } + + @Test + public void testIgnoreDeleted() throws IOException { + TrackedFileStruct added = createDataFile("added.parquet", 1000L); + added.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct deleted = createDataFile("deleted.parquet", 2000L); + deleted.ensureTrackingInfo().setStatus(TrackingInfo.Status.DELETED); + + TrackedFileStruct existing = createDataFile("existing.parquet", 3000L); + existing.ensureTrackingInfo().setStatus(TrackingInfo.Status.EXISTING); + + String rootPath = writeRootManifest(added, deleted, existing); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById).ignoreDeleted(); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(2); + assertThat(scanInfos).noneMatch(info -> info.dataFile().location().endsWith("deleted.parquet")); + } + + @Test + public void testIgnoreExisting() throws IOException { + TrackedFileStruct added = createDataFile("added.parquet", 1000L); + added.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct existing = createDataFile("existing.parquet", 2000L); + existing.ensureTrackingInfo().setStatus(TrackingInfo.Status.EXISTING); + + String rootPath = writeRootManifest(added, existing); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById).ignoreExisting(); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).dataFile().location()).endsWith("added.parquet"); + } + + @Test + public void testPlanDataFilesFromManifests() throws IOException { + TrackedFileStruct dataFile1 = createDataFile("data1.parquet", 1000L); + TrackedFileStruct dataFile2 = createDataFile("data2.parquet", 2000L); + String dataManifestPath = writeLeafManifest(dataFile1, dataFile2); + + TrackedFileStruct manifestEntry = createManifestEntry(dataManifestPath, 2, 3000L); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(2); + assertThat(scanInfos).allMatch(info -> info.dataFile().contentType() == FileContent.DATA); + } + + @Test + public void testDeleteMatching() throws IOException { + String dataFilePath = "s3://bucket/table/data/file1.parquet"; + TrackedFileStruct dataFile = createDataFile("file1.parquet", 1000L); + dataFile.setLocation(dataFilePath); + + TrackedFileStruct deleteFile = createDeleteFile("delete1.parquet", 50L); + deleteFile.setReferencedFile(dataFilePath); + + String rootPath = writeRootManifest(dataFile, deleteFile); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + ManifestExpander.DataFileScanInfo scanInfo = scanInfos.get(0); + + assertThat(scanInfo.dataFile().location()).isEqualTo(dataFilePath); + assertThat(scanInfo.deleteFiles()).hasSize(1); + assertThat(scanInfo.deleteFiles().get(0).location()).endsWith("delete1.parquet"); + } + + @Test + public void testMultipleDeletesMatchToSameFile() throws IOException { + String dataFilePath = "s3://bucket/table/data/file1.parquet"; + TrackedFileStruct dataFile = createDataFile("file1.parquet", 1000L); + dataFile.setLocation(dataFilePath); + + TrackedFileStruct deleteFile1 = createDeleteFile("delete1.parquet", 50L); + deleteFile1.setReferencedFile(dataFilePath); + + TrackedFileStruct deleteFile2 = createDeleteFile("delete2.parquet", 30L); + deleteFile2.setReferencedFile(dataFilePath); + + String rootPath = writeRootManifest(dataFile, deleteFile1, deleteFile2); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).deleteFiles()).hasSize(2); + } + + @Test + public void testDeleteWithoutReferencedFileNotMatched() throws IOException { + TrackedFileStruct dataFile = createDataFile("data.parquet", 1000L); + + TrackedFileStruct deleteFile = createDeleteFile("delete.parquet", 50L); + + String rootPath = writeRootManifest(dataFile, deleteFile); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).deleteFiles()).isEmpty(); + } + + @Test + public void testDataFileWithNoDeletes() throws IOException { + TrackedFileStruct dataFile = createDataFile("data.parquet", 1000L); + + String rootPath = writeRootManifest(dataFile); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).deleteFiles()).isEmpty(); + } + + @Test + public void testSequenceNumberFiltering() throws IOException { + String dataFilePath = "s3://bucket/table/data/file1.parquet"; + TrackedFileStruct dataFile = createDataFile("file1.parquet", 1000L); + dataFile.setLocation(dataFilePath); + dataFile.ensureTrackingInfo().setSequenceNumber(100L); + + TrackedFileStruct delete1 = createDeleteFile("delete1.parquet", 50L); + delete1.setReferencedFile(dataFilePath); + delete1.ensureTrackingInfo().setSequenceNumber(95L); + + TrackedFileStruct delete2 = createDeleteFile("delete2.parquet", 30L); + delete2.setReferencedFile(dataFilePath); + delete2.ensureTrackingInfo().setSequenceNumber(105L); + + String rootPath = writeRootManifest(dataFile, delete1, delete2); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).deleteFiles()).hasSize(1); + assertThat(scanInfos.get(0).deleteFiles().get(0).location()).endsWith("delete1.parquet"); + } + + private TrackedFileStruct createDataFile(String filename, long recordCount) { + TrackedFileStruct file = new TrackedFileStruct(); + file.setContentType(FileContent.DATA); + file.setLocation("s3://bucket/table/data/" + filename); + file.setFileFormat(FileFormat.PARQUET); + file.setPartitionSpecId(0); + file.setRecordCount(recordCount); + file.setFileSizeInBytes(recordCount * 100); + TrackedFileStruct.TrackingInfoStruct tracking = file.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(SNAPSHOT_ID); + return file; + } + + private TrackedFileStruct createDeleteFile(String filename, long recordCount) { + TrackedFileStruct file = new TrackedFileStruct(); + file.setContentType(FileContent.POSITION_DELETES); + file.setLocation("s3://bucket/table/deletes/" + filename); + file.setFileFormat(FileFormat.PARQUET); + file.setPartitionSpecId(0); + file.setRecordCount(recordCount); + file.setFileSizeInBytes(recordCount * 50); + TrackedFileStruct.TrackingInfoStruct tracking = file.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(SNAPSHOT_ID); + return file; + } + + private TrackedFileStruct createManifestEntry( + String manifestLocation, int fileCount, long totalRows) { + TrackedFileStruct entry = new TrackedFileStruct(); + entry.setContentType(FileContent.DATA_MANIFEST); + entry.setLocation(manifestLocation); + entry.setFileFormat(FileFormat.PARQUET); + entry.setPartitionSpecId(0); + entry.setRecordCount(fileCount); + entry.setFileSizeInBytes(10000L); + + TrackedFileStruct.ManifestStatsStruct stats = new TrackedFileStruct.ManifestStatsStruct(); + stats.setAddedFilesCount(fileCount); + stats.setExistingFilesCount(0); + stats.setDeletedFilesCount(0); + stats.setAddedRowsCount(totalRows); + stats.setExistingRowsCount(0L); + stats.setDeletedRowsCount(0L); + stats.setMinSequenceNumber(SEQUENCE_NUMBER); + entry.setManifestStats(stats); + + TrackedFileStruct.TrackingInfoStruct tracking = entry.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(SNAPSHOT_ID); + tracking.setSequenceNumber(SEQUENCE_NUMBER); + tracking.setFileSequenceNumber(SEQUENCE_NUMBER); + + return entry; + } + + private TrackedFileStruct createDeleteManifestEntry( + String manifestLocation, int fileCount, long totalRows) { + TrackedFileStruct entry = new TrackedFileStruct(); + entry.setContentType(FileContent.DELETE_MANIFEST); + entry.setLocation(manifestLocation); + entry.setFileFormat(FileFormat.PARQUET); + entry.setPartitionSpecId(0); + entry.setRecordCount(fileCount); + entry.setFileSizeInBytes(5000L); + + TrackedFileStruct.ManifestStatsStruct deleteStats = new TrackedFileStruct.ManifestStatsStruct(); + deleteStats.setAddedFilesCount(fileCount); + deleteStats.setExistingFilesCount(0); + deleteStats.setDeletedFilesCount(0); + deleteStats.setAddedRowsCount(totalRows); + deleteStats.setExistingRowsCount(0L); + deleteStats.setDeletedRowsCount(0L); + deleteStats.setMinSequenceNumber(SEQUENCE_NUMBER); + entry.setManifestStats(deleteStats); + + TrackedFileStruct.TrackingInfoStruct tracking = entry.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(SNAPSHOT_ID); + tracking.setSequenceNumber(SEQUENCE_NUMBER); + tracking.setFileSequenceNumber(SEQUENCE_NUMBER); + + return entry; + } + + private String writeRootManifest(TrackedFileStruct... entries) throws IOException { + return writeManifest("root-manifest", entries); + } + + private String writeLeafManifest(TrackedFileStruct... entries) throws IOException { + return writeManifest("leaf-manifest", entries); + } + + private String writeManifest(String prefix, TrackedFileStruct... entries) throws IOException { + OutputFile outputFile = io.newOutputFile(prefix + "-" + System.nanoTime() + ".avro"); + + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(new Schema(TrackedFileStruct.WRITE_TYPE.fields())) + .named("tracked_file") + .build()) { + for (TrackedFileStruct entry : entries) { + appender.add(entry); + } + } + + return outputFile.location(); + } + + @Test + public void testManifestDVFiltersPositions() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L); + TrackedFileStruct file3 = createDataFile("file3.parquet", 3000L); + String leafManifestPath = writeLeafManifest(file1, file2, file3); + + // Create manifest entry with inline manifest DV that marks position 1 as deleted + TrackedFileStruct manifestEntry = createManifestEntry(leafManifestPath, 3, 6000L); + ByteBuffer manifestDvBitmap = serializeManifestDV(new long[] {1}); + manifestEntry.setManifestDV(manifestDvBitmap); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(2); + assertThat(scanInfos) + .anyMatch(info -> info.dataFile().location().endsWith("file1.parquet")) + .anyMatch(info -> info.dataFile().location().endsWith("file3.parquet")); + assertThat(scanInfos).noneMatch(info -> info.dataFile().location().endsWith("file2.parquet")); + } + + @Test + public void testManifestWithoutDV() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + String leafManifestPath = writeLeafManifest(file1); + + TrackedFileStruct manifestEntry = createManifestEntry(leafManifestPath, 1, 1000L); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(1); + assertThat(scanInfos.get(0).dataFile().location()).endsWith("file1.parquet"); + } + + @Test + public void testManifestDVDeletesMultiplePositions() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L); + TrackedFileStruct file3 = createDataFile("file3.parquet", 3000L); + TrackedFileStruct file4 = createDataFile("file4.parquet", 4000L); + String leafManifestPath = writeLeafManifest(file1, file2, file3, file4); + + // Create manifest entry with inline manifest DV that marks positions 0 and 2 as deleted + TrackedFileStruct manifestEntry = createManifestEntry(leafManifestPath, 4, 10000L); + ByteBuffer manifestDvBitmap = serializeManifestDV(new long[] {0, 2}); + manifestEntry.setManifestDV(manifestDvBitmap); + + String rootPath = writeRootManifest(manifestEntry); + + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = new ManifestExpander(rootReader, io, specsById); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + // Positions 0 (file1) and 2 (file3) should be filtered out by the manifest DV + assertThat(scanInfos).hasSize(2); + assertThat(scanInfos) + .anyMatch(info -> info.dataFile().location().endsWith("file2.parquet")) + .anyMatch(info -> info.dataFile().location().endsWith("file4.parquet")); + } + + private ByteBuffer serializeManifestDV(long[] deletedPositions) throws IOException { + RoaringBitmap dv = new RoaringBitmap(); + for (long pos : deletedPositions) { + dv.add((int) pos); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dv.serialize(new DataOutputStream(baos)); + return ByteBuffer.wrap(baos.toByteArray()); + } + + @Test + public void testParallelManifestReading() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L); + String manifest1Path = writeLeafManifest(file1); + + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L); + TrackedFileStruct file3 = createDataFile("file3.parquet", 3000L); + String manifest2Path = writeLeafManifest(file2, file3); + + TrackedFileStruct manifestEntry1 = createManifestEntry(manifest1Path, 1, 1000L); + TrackedFileStruct manifestEntry2 = createManifestEntry(manifest2Path, 2, 5000L); + + String rootPath = writeRootManifest(manifestEntry1, manifestEntry2); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + V4ManifestReader rootReader = + V4ManifestReaders.readRoot(rootPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + ManifestExpander expander = + new ManifestExpander(rootReader, io, specsById).planWith(executor); + + List scanInfos = + Lists.newArrayList(expander.planDataFiles()); + + assertThat(scanInfos).hasSize(3); + List locations = Lists.newArrayList(); + for (ManifestExpander.DataFileScanInfo info : scanInfos) { + locations.add(info.dataFile().location()); + } + + assertThat(locations) + .anyMatch(loc -> loc.endsWith("file1.parquet")) + .anyMatch(loc -> loc.endsWith("file2.parquet")) + .anyMatch(loc -> loc.endsWith("file3.parquet")); + } finally { + executor.shutdown(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java b/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java new file mode 100644 index 000000000000..967a5da209ea --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestTrackedFileStruct.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestTrackedFileStruct { + + @TempDir private Path temp; + + private static Schema getWriteSchema() { + return new Schema(TrackedFileStruct.WRITE_TYPE.fields()); + } + + private static Schema getReadSchema() { + return new Schema(TrackedFileStruct.BASE_TYPE.fields()); + } + + @Test + public void testAvroRoundTripDataFile() throws IOException { + // Create a tracked file representing a data file + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.DATA); + original.setLocation("s3://bucket/table/data/file1.parquet"); + original.setFileFormat(FileFormat.PARQUET); + original.setPartitionSpecId(0); + original.setRecordCount(1000L); + original.setFileSizeInBytes(50000L); + original.setSortOrderId(1); + original.setSplitOffsets(ImmutableList.of(0L, 10000L, 20000L)); + + // Set tracking info + TrackedFileStruct.TrackingInfoStruct trackingStruct = original.ensureTrackingInfo(); + trackingStruct.setStatus(TrackingInfo.Status.ADDED); + trackingStruct.setSnapshotId(12345L); + trackingStruct.setSequenceNumber(100L); + trackingStruct.setFileSequenceNumber(100L); + trackingStruct.setFirstRowId(0L); + + // Set key metadata + original.setKeyMetadata(ByteBuffer.wrap(new byte[] {1, 2, 3, 4})); + + // Write to file + OutputFile outputFile = Files.localOutput(temp.resolve("tracked-file.parquet").toFile()); + List written; + try (FileAppender appender = + InternalData.write(FileFormat.PARQUET, outputFile) + .schema(getWriteSchema()) + .named("tracked_file") + .build()) { + appender.add(original); + written = ImmutableList.of(original); + } + + // Read back + InputFile inputFile = outputFile.toInputFile(); + List read; + try (CloseableIterable files = + InternalData.read(FileFormat.PARQUET, inputFile) + .setRootType(TrackedFileStruct.class) + .project(getReadSchema()) + .build()) { + read = Lists.newArrayList(files); + } + + // Verify + assertThat(read).hasSize(1); + TrackedFileStruct roundTripped = read.get(0); + + assertThat(roundTripped.contentType()).isEqualTo(FileContent.DATA); + assertThat(roundTripped.location()).isEqualTo("s3://bucket/table/data/file1.parquet"); + assertThat(roundTripped.fileFormat()).isEqualTo(FileFormat.PARQUET); + assertThat(roundTripped.partitionSpecId()).isEqualTo(0); + assertThat(roundTripped.recordCount()).isEqualTo(1000L); + assertThat(roundTripped.fileSizeInBytes()).isEqualTo(50000L); + assertThat(roundTripped.sortOrderId()).isEqualTo(1); + assertThat(roundTripped.splitOffsets()).containsExactly(0L, 10000L, 20000L); + + // Verify tracking info + TrackingInfo trackingInfo = roundTripped.trackingInfo(); + assertThat(trackingInfo).isNotNull(); + assertThat(trackingInfo.status()).isEqualTo(TrackingInfo.Status.ADDED); + assertThat(trackingInfo.snapshotId()).isEqualTo(12345L); + assertThat(trackingInfo.dataSequenceNumber()).isEqualTo(100L); + assertThat(trackingInfo.fileSequenceNumber()).isEqualTo(100L); + assertThat(trackingInfo.firstRowId()).isEqualTo(0L); + + // Verify key metadata + assertThat(roundTripped.keyMetadata()).isEqualTo(ByteBuffer.wrap(new byte[] {1, 2, 3, 4})); + } + + @Test + public void testAvroRoundTripPositionDeletesWithDeletionVector() throws IOException { + // Create a tracked file representing position deletes with external deletion vector + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.POSITION_DELETES); + original.setLocation("s3://bucket/table/deletes/dv1.puffin"); + original.setFileFormat(FileFormat.PUFFIN); + original.setPartitionSpecId(0); + original.setRecordCount(50L); // 50 deleted positions + original.setFileSizeInBytes(1000L); + original.setReferencedFile("s3://bucket/table/data/file1.parquet"); + + // Set content info (for external deletion vector) + TrackedFileStruct.ContentInfoStruct contentInfo = new TrackedFileStruct.ContentInfoStruct(); + contentInfo.setOffset(100L); + contentInfo.setSizeInBytes(500L); + original.setContentInfo(contentInfo); + + // Set tracking info + TrackedFileStruct.TrackingInfoStruct tracking = original.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(12346L); + tracking.setSequenceNumber(101L); + tracking.setFileSequenceNumber(101L); + + // Write to file + OutputFile outputFile = Files.localOutput(temp.resolve("dv-tracked-file.avro").toFile()); + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(getWriteSchema()) + .named("tracked_file") + .build()) { + appender.add(original); + } + + // Read back + InputFile inputFile = outputFile.toInputFile(); + List read; + try (CloseableIterable files = + InternalData.read(FileFormat.AVRO, inputFile) + .setRootType(TrackedFileStruct.class) + .project(getReadSchema()) + .build()) { + read = Lists.newArrayList(files); + } + + // Verify + assertThat(read).hasSize(1); + TrackedFileStruct roundTripped = read.get(0); + + assertThat(roundTripped.contentType()).isEqualTo(FileContent.POSITION_DELETES); + assertThat(roundTripped.location()).isEqualTo("s3://bucket/table/deletes/dv1.puffin"); + assertThat(roundTripped.fileFormat()).isEqualTo(FileFormat.PUFFIN); + assertThat(roundTripped.recordCount()).isEqualTo(50L); + assertThat(roundTripped.referencedFile()).isEqualTo("s3://bucket/table/data/file1.parquet"); + + // Verify deletion vector + ContentInfo dv = roundTripped.contentInfo(); + assertThat(dv).isNotNull(); + assertThat(dv.offset()).isEqualTo(100L); + assertThat(dv.sizeInBytes()).isEqualTo(500L); + } + + @Test + public void testAvroRoundTripDataManifestWithManifestDV() throws IOException { + // Create a tracked file representing a data manifest entry with an inline manifest DV + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.DATA_MANIFEST); + original.setLocation("s3://bucket/table/metadata/manifest-data-1.avro"); + original.setFileFormat(FileFormat.AVRO); + original.setPartitionSpecId(0); + original.setRecordCount(100L); // 100 entries in the manifest + original.setFileSizeInBytes(25000L); + + // Set manifest DV - marks positions 0, 1, 2 as deleted in the referenced manifest + byte[] dvContent = new byte[] {(byte) 0b00000111}; + original.setManifestDV(ByteBuffer.wrap(dvContent)); + + // Set tracking info + original.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + original.ensureTrackingInfo().setSnapshotId(12347L); + + // Write to file + OutputFile outputFile = Files.localOutput(temp.resolve("manifest-with-dv.avro").toFile()); + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(getWriteSchema()) + .named("tracked_file") + .build()) { + appender.add(original); + } + + // Read back + InputFile inputFile = outputFile.toInputFile(); + List read; + try (CloseableIterable files = + InternalData.read(FileFormat.AVRO, inputFile) + .setRootType(TrackedFileStruct.class) + .project(getReadSchema()) + .build()) { + read = Lists.newArrayList(files); + } + + // Verify + assertThat(read).hasSize(1); + TrackedFileStruct roundTripped = read.get(0); + + assertThat(roundTripped.contentType()).isEqualTo(FileContent.DATA_MANIFEST); + assertThat(roundTripped.location()) + .isEqualTo("s3://bucket/table/metadata/manifest-data-1.avro"); + assertThat(roundTripped.recordCount()).isEqualTo(100L); + + // Verify manifest DV + ByteBuffer manifestDV = roundTripped.manifestDV(); + assertThat(manifestDV).isNotNull(); + assertThat(manifestDV).isEqualTo(ByteBuffer.wrap(dvContent)); + } + + @Test + public void testAvroRoundTripDataManifestWithStats() throws IOException { + // Create a tracked file representing a data manifest entry + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.DATA_MANIFEST); + original.setLocation("s3://bucket/table/metadata/manifest-data-1.avro"); + original.setFileFormat(FileFormat.AVRO); + original.setPartitionSpecId(0); + original.setRecordCount(100L); // 100 entries in the manifest + original.setFileSizeInBytes(25000L); + + // Set manifest stats + TrackedFileStruct.ManifestStatsStruct manifestStats = + new TrackedFileStruct.ManifestStatsStruct(); + manifestStats.setAddedFilesCount(10); + manifestStats.setExistingFilesCount(85); + manifestStats.setDeletedFilesCount(5); + manifestStats.setAddedRowsCount(10000L); + manifestStats.setExistingRowsCount(850000L); + manifestStats.setDeletedRowsCount(5000L); + manifestStats.setMinSequenceNumber(50L); + original.setManifestStats(manifestStats); + + // Set tracking info + TrackedFileStruct.TrackingInfoStruct trackingStruct = original.ensureTrackingInfo(); + trackingStruct.setStatus(TrackingInfo.Status.EXISTING); + trackingStruct.setSnapshotId(12348L); + trackingStruct.setSequenceNumber(102L); + trackingStruct.setFileSequenceNumber(100L); + trackingStruct.setFirstRowId(100000L); // Starting row ID for new data files + + // Write to file + OutputFile outputFile = + Files.localOutput(temp.resolve("data-manifest-tracked-file.avro").toFile()); + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(getWriteSchema()) + .named("tracked_file") + .build()) { + appender.add(original); + } + + // Read back + InputFile inputFile = outputFile.toInputFile(); + List read; + try (CloseableIterable files = + InternalData.read(FileFormat.AVRO, inputFile) + .setRootType(TrackedFileStruct.class) + .project(getReadSchema()) + .build()) { + read = Lists.newArrayList(files); + } + + // Verify + assertThat(read).hasSize(1); + TrackedFileStruct roundTripped = read.get(0); + + assertThat(roundTripped.contentType()).isEqualTo(FileContent.DATA_MANIFEST); + assertThat(roundTripped.location()) + .isEqualTo("s3://bucket/table/metadata/manifest-data-1.avro"); + assertThat(roundTripped.fileFormat()).isEqualTo(FileFormat.AVRO); + assertThat(roundTripped.recordCount()).isEqualTo(100L); + assertThat(roundTripped.fileSizeInBytes()).isEqualTo(25000L); + + // Verify manifest stats + ManifestStats stats = roundTripped.manifestStats(); + assertThat(stats).isNotNull(); + assertThat(stats.addedFilesCount()).isEqualTo(10); + assertThat(stats.existingFilesCount()).isEqualTo(85); + assertThat(stats.deletedFilesCount()).isEqualTo(5); + assertThat(stats.addedRowsCount()).isEqualTo(10000L); + assertThat(stats.existingRowsCount()).isEqualTo(850000L); + assertThat(stats.deletedRowsCount()).isEqualTo(5000L); + assertThat(stats.minSequenceNumber()).isEqualTo(50L); + + // Verify tracking info + TrackingInfo trackingInfo = roundTripped.trackingInfo(); + assertThat(trackingInfo).isNotNull(); + assertThat(trackingInfo.status()).isEqualTo(TrackingInfo.Status.EXISTING); + assertThat(trackingInfo.firstRowId()).isEqualTo(100000L); + } + + @Test + public void testAvroRoundTripEqualityDeletes() throws IOException { + // Create a tracked file representing equality deletes + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.EQUALITY_DELETES); + original.setLocation("s3://bucket/table/deletes/eq-delete-1.parquet"); + original.setFileFormat(FileFormat.PARQUET); + original.setPartitionSpecId(0); + original.setRecordCount(25L); + original.setFileSizeInBytes(5000L); + original.setEqualityIds(ImmutableList.of(1, 2, 5)); // Field IDs for equality comparison + original.setSortOrderId(1); + + // Set tracking info + TrackedFileStruct.TrackingInfoStruct tracking = original.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(12349L); + tracking.setSequenceNumber(103L); + tracking.setFileSequenceNumber(103L); + + // Write to file + OutputFile outputFile = + Files.localOutput(temp.resolve("equality-delete-tracked-file.avro").toFile()); + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(getWriteSchema()) + .named("tracked_file") + .build()) { + appender.add(original); + } + + // Read back + InputFile inputFile = outputFile.toInputFile(); + List read; + try (CloseableIterable files = + InternalData.read(FileFormat.AVRO, inputFile) + .setRootType(TrackedFileStruct.class) + .project(getReadSchema()) + .build()) { + read = Lists.newArrayList(files); + } + + // Verify + assertThat(read).hasSize(1); + TrackedFileStruct roundTripped = read.get(0); + + assertThat(roundTripped.contentType()).isEqualTo(FileContent.EQUALITY_DELETES); + assertThat(roundTripped.location()).isEqualTo("s3://bucket/table/deletes/eq-delete-1.parquet"); + assertThat(roundTripped.fileFormat()).isEqualTo(FileFormat.PARQUET); + assertThat(roundTripped.recordCount()).isEqualTo(25L); + assertThat(roundTripped.equalityIds()).containsExactly(1, 2, 5); + assertThat(roundTripped.sortOrderId()).isEqualTo(1); + } + + @Test + public void testCopy() { + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.DATA); + original.setLocation("s3://bucket/table/data/file1.parquet"); + original.setFileFormat(FileFormat.PARQUET); + original.setPartitionSpecId(0); + original.setRecordCount(1000L); + original.setFileSizeInBytes(50000L); + + // Set manifest stats (should be copied) + TrackedFileStruct.ManifestStatsStruct stats = new TrackedFileStruct.ManifestStatsStruct(); + stats.setAddedFilesCount(10); + stats.setMinSequenceNumber(50L); + original.setManifestStats(stats); + + TrackedFile copy = original.copy(); + + // Verify copy is equal but separate instance + assertThat(copy).isNotSameAs(original); + assertThat(copy.contentType()).isEqualTo(original.contentType()); + assertThat(copy.location()).isEqualTo(original.location()); + assertThat(copy.recordCount()).isEqualTo(original.recordCount()); + + // Verify stats were copied + assertThat(copy.manifestStats()).isNotNull(); + assertThat(copy.manifestStats().addedFilesCount()).isEqualTo(10); + } + + @Test + public void testCopyWithoutStats() { + TrackedFileStruct original = new TrackedFileStruct(); + original.setContentType(FileContent.DATA); + original.setLocation("s3://bucket/table/data/file1.parquet"); + original.setFileFormat(FileFormat.PARQUET); + original.setPartitionSpecId(0); + original.setRecordCount(1000L); + original.setFileSizeInBytes(50000L); + + // Set manifest stats (should NOT be copied) + TrackedFileStruct.ManifestStatsStruct stats = new TrackedFileStruct.ManifestStatsStruct(); + stats.setAddedFilesCount(10); + stats.setMinSequenceNumber(50L); + original.setManifestStats(stats); + + TrackedFile copy = original.copyWithoutStats(); + + // Verify copy is equal but stats are dropped + assertThat(copy).isNotSameAs(original); + assertThat(copy.contentType()).isEqualTo(original.contentType()); + assertThat(copy.location()).isEqualTo(original.location()); + assertThat(copy.recordCount()).isEqualTo(original.recordCount()); + + // Verify stats were NOT copied + assertThat(copy.manifestStats()).isNull(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestV4ManifestReader.java b/core/src/test/java/org/apache/iceberg/TestV4ManifestReader.java new file mode 100644 index 000000000000..4ca04259ca2c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestV4ManifestReader.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestV4ManifestReader { + + private final FileIO io = new InMemoryFileIO(); + private static final long SNAPSHOT_ID = 12345L; + private static final long SEQUENCE_NUMBER = 100L; + + @TempDir private Path temp; + + @Test + public void testReadFlatManifest() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L, 50000L); + TrackedFileStruct.TrackingInfoStruct tracking1 = file1.ensureTrackingInfo(); + tracking1.setStatus(TrackingInfo.Status.ADDED); + tracking1.setSnapshotId(SNAPSHOT_ID); + tracking1.setSequenceNumber(SEQUENCE_NUMBER); + tracking1.setFileSequenceNumber(SEQUENCE_NUMBER); + + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L, 100000L); + TrackedFileStruct.TrackingInfoStruct tracking2 = file2.ensureTrackingInfo(); + tracking2.setStatus(TrackingInfo.Status.EXISTING); + tracking2.setSnapshotId(SNAPSHOT_ID - 1); + tracking2.setSequenceNumber(SEQUENCE_NUMBER - 1); + tracking2.setFileSequenceNumber(SEQUENCE_NUMBER - 1); + + String manifestPath = writeManifest(file1, file2); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + + List files = Lists.newArrayList(reader); + + assertThat(files).hasSize(2); + + TrackedFile read1 = files.get(0); + assertThat(read1.location()).isNotNull(); + assertThat(read1.location()).endsWith("file1.parquet"); + assertThat(read1.recordCount()).isEqualTo(1000L); + assertThat(read1.fileSizeInBytes()).isEqualTo(50000L); + assertThat(read1.pos()).isNotNull(); + assertThat(read1.pos()).isEqualTo(0L); + + TrackedFile read2 = files.get(1); + assertThat(read2.location()).endsWith("file2.parquet"); + assertThat(read2.recordCount()).isEqualTo(2000L); + assertThat(read2.pos()).isEqualTo(1L); + } + + @Test + public void testInheritSnapshotId() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L, 50000L); + file1.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + String manifestPath = writeManifest(file1); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + + List files = Lists.newArrayList(reader); + + assertThat(files).hasSize(1); + TrackedFile read = files.get(0); + + TrackingInfo tracking = read.trackingInfo(); + assertThat(tracking).isNotNull(); + assertThat(tracking.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat(tracking.dataSequenceNumber()).isEqualTo(SEQUENCE_NUMBER); + assertThat(tracking.fileSequenceNumber()).isEqualTo(SEQUENCE_NUMBER); + } + + @Test + public void testInheritSequenceNumberForAddedOnly() throws IOException { + TrackedFileStruct added = createDataFile("added.parquet", 1000L, 50000L); + TrackedFileStruct.TrackingInfoStruct addedTracking = added.ensureTrackingInfo(); + addedTracking.setStatus(TrackingInfo.Status.ADDED); + addedTracking.setSnapshotId(SNAPSHOT_ID); + + TrackedFileStruct existing = createDataFile("existing.parquet", 2000L, 100000L); + TrackedFileStruct.TrackingInfoStruct existingTracking = existing.ensureTrackingInfo(); + existingTracking.setStatus(TrackingInfo.Status.EXISTING); + existingTracking.setSnapshotId(SNAPSHOT_ID - 1); + existingTracking.setSequenceNumber(SEQUENCE_NUMBER - 10); + existingTracking.setFileSequenceNumber(SEQUENCE_NUMBER - 10); + + String manifestPath = writeManifest(added, existing); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + + List files = Lists.newArrayList(reader); + + assertThat(files).hasSize(2); + + TrackedFile readAdded = files.get(0); + assertThat(readAdded.trackingInfo().status()).isEqualTo(TrackingInfo.Status.ADDED); + assertThat(readAdded.trackingInfo().dataSequenceNumber()).isEqualTo(SEQUENCE_NUMBER); + + TrackedFile readExisting = files.get(1); + assertThat(readExisting.trackingInfo().status()).isEqualTo(TrackingInfo.Status.EXISTING); + assertThat(readExisting.trackingInfo().dataSequenceNumber()).isEqualTo(SEQUENCE_NUMBER - 10); + } + + @Test + public void testRowIdAssignmentInLeafManifest() throws IOException { + // Create data files for the leaf manifest + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L, 50000L); + TrackedFileStruct.TrackingInfoStruct tracking1 = file1.ensureTrackingInfo(); + tracking1.setStatus(TrackingInfo.Status.ADDED); + tracking1.setSnapshotId(SNAPSHOT_ID); + + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L, 100000L); + TrackedFileStruct.TrackingInfoStruct tracking2 = file2.ensureTrackingInfo(); + tracking2.setStatus(TrackingInfo.Status.ADDED); + tracking2.setSnapshotId(SNAPSHOT_ID); + + String manifestPath = writeManifest(file1, file2); + + // Create a DATA_MANIFEST entry that points to the manifest, with first_row_id set + long startingRowId = 1000L; + TrackedFileStruct manifestEntry = new TrackedFileStruct(); + manifestEntry.setContentType(FileContent.DATA_MANIFEST); + manifestEntry.setLocation(manifestPath); + manifestEntry.setFileFormat(FileFormat.AVRO); + TrackedFileStruct.TrackingInfoStruct manifestTracking = manifestEntry.ensureTrackingInfo(); + manifestTracking.setStatus(TrackingInfo.Status.ADDED); + manifestTracking.setSnapshotId(SNAPSHOT_ID); + manifestTracking.setSequenceNumber(SEQUENCE_NUMBER); + manifestTracking.setFirstRowId(startingRowId); + + Map specsById = ImmutableMap.of(); + + V4ManifestReader reader = V4ManifestReaders.readLeaf(manifestEntry, io, specsById); + + List files = Lists.newArrayList(reader); + + assertThat(files).hasSize(2); + + TrackedFile read1 = files.get(0); + assertThat(read1.trackingInfo().firstRowId()).isEqualTo(1000L); + + TrackedFile read2 = files.get(1); + assertThat(read2.trackingInfo().firstRowId()).isEqualTo(2000L); + } + + @Test + public void testLiveEntriesFilterDeleted() throws IOException { + TrackedFileStruct added = createDataFile("added.parquet", 1000L, 50000L); + TrackedFileStruct.TrackingInfoStruct addedTracking = added.ensureTrackingInfo(); + addedTracking.setStatus(TrackingInfo.Status.ADDED); + addedTracking.setSnapshotId(SNAPSHOT_ID); + + TrackedFileStruct deleted = createDataFile("deleted.parquet", 2000L, 100000L); + TrackedFileStruct.TrackingInfoStruct deletedTracking = deleted.ensureTrackingInfo(); + deletedTracking.setStatus(TrackingInfo.Status.DELETED); + deletedTracking.setSnapshotId(SNAPSHOT_ID); + + TrackedFileStruct existing = createDataFile("existing.parquet", 3000L, 150000L); + TrackedFileStruct.TrackingInfoStruct existingTracking = existing.ensureTrackingInfo(); + existingTracking.setStatus(TrackingInfo.Status.EXISTING); + existingTracking.setSnapshotId(SNAPSHOT_ID - 1); + + String manifestPath = writeManifest(added, deleted, existing); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + + List liveFiles = Lists.newArrayList(reader.liveFiles()); + + assertThat(liveFiles).hasSize(2); + + List locations = Lists.newArrayList(); + List statuses = Lists.newArrayList(); + for (TrackedFile file : liveFiles) { + locations.add(file.location()); + statuses.add(file.trackingInfo().status()); + } + + assertThat(locations) + .anyMatch(loc -> loc.endsWith("added.parquet")) + .anyMatch(loc -> loc.endsWith("existing.parquet")); + assertThat(statuses) + .containsExactlyInAnyOrder(TrackingInfo.Status.ADDED, TrackingInfo.Status.EXISTING); + assertThat(locations).noneMatch(loc -> loc.endsWith("deleted.parquet")); + } + + @Test + public void testColumnProjection() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L, 50000L); + TrackedFileStruct.TrackingInfoStruct tracking = file1.ensureTrackingInfo(); + tracking.setStatus(TrackingInfo.Status.ADDED); + tracking.setSnapshotId(SNAPSHOT_ID); + file1.setSortOrderId(5); + + String manifestPath = writeManifest(file1); + + // Create a projection schema with only location and record_count fields + Schema projection = new Schema(TrackedFile.LOCATION, TrackedFile.RECORD_COUNT); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER) + .project(projection); + + List files = Lists.newArrayList(reader); + + assertThat(files).hasSize(1); + TrackedFile read = files.get(0); + assertThat(read.location()).endsWith("file1.parquet"); + assertThat(read.recordCount()).isEqualTo(1000L); + } + + @Test + public void testPositionTracking() throws IOException { + TrackedFileStruct file1 = createDataFile("file1.parquet", 1000L, 50000L); + file1.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct file2 = createDataFile("file2.parquet", 2000L, 100000L); + file2.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + TrackedFileStruct file3 = createDataFile("file3.parquet", 3000L, 150000L); + file3.ensureTrackingInfo().setStatus(TrackingInfo.Status.ADDED); + + String manifestPath = writeManifest(file1, file2, file3); + + V4ManifestReader reader = + V4ManifestReaders.readRoot(manifestPath, io, SNAPSHOT_ID, SEQUENCE_NUMBER); + + List files = Lists.newArrayList(reader.allFiles()); + + assertThat(files).hasSize(3); + + List positions = Lists.newArrayList(); + for (TrackedFile file : files) { + assertThat(file.pos()).isNotNull(); + positions.add(file.pos()); + } + + assertThat(positions).containsExactlyInAnyOrder(0L, 1L, 2L); + } + + private TrackedFileStruct createDataFile(String filename, long recordCount, long fileSize) { + TrackedFileStruct file = new TrackedFileStruct(); + file.setContentType(FileContent.DATA); + file.setLocation("s3://bucket/table/data/" + filename); + file.setFileFormat(FileFormat.PARQUET); + file.setPartitionSpecId(0); + file.setRecordCount(recordCount); + file.setFileSizeInBytes(fileSize); + return file; + } + + private String writeManifest(TrackedFileStruct... files) throws IOException { + OutputFile outputFile = io.newOutputFile("manifest-" + System.nanoTime() + ".avro"); + + try (FileAppender appender = + InternalData.write(FileFormat.AVRO, outputFile) + .schema(new Schema(TrackedFileStruct.WRITE_TYPE.fields())) + .named("tracked_file") + .build()) { + for (TrackedFileStruct file : files) { + appender.add(file); + } + } + + return outputFile.location(); + } +}