Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 54 additions & 26 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.ListType;
Expand All @@ -37,31 +38,58 @@
* Interface for files listed in a table manifest.
*/
public interface DataFile {
// fields for adding delete data files
Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
Types.NestedField FILE_PATH = required(100, "file_path", StringType.get(), "Location URI with FS scheme");
Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get(),
"File format name: avro, orc, or parquet");
Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get(), "Number of records in the file");
Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get(), "Total file size in bytes");
Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118,
IntegerType.get(), LongType.get()), "Map of column id to total size on disk");
Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120,
IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN");
Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122,
IntegerType.get(), LongType.get()), "Map of column id to null value count");
Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127,
IntegerType.get(), BinaryType.get()), "Map of column id to lower bound");
Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130,
IntegerType.get(), BinaryType.get()), "Map of column id to upper bound");
Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get(), "Encryption key metadata blob");
Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()),
"Splittable offsets");
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 135

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
return StructType.of(
required(100, "file_path", StringType.get()),
required(101, "file_format", StringType.get()),
required(102, "partition", partitionType),
required(103, "record_count", LongType.get()),
required(104, "file_size_in_bytes", LongType.get()),
required(105, "block_size_in_bytes", LongType.get()),
optional(108, "column_sizes", MapType.ofRequired(117, 118,
IntegerType.get(), LongType.get())),
optional(109, "value_counts", MapType.ofRequired(119, 120,
IntegerType.get(), LongType.get())),
optional(110, "null_value_counts", MapType.ofRequired(121, 122,
IntegerType.get(), LongType.get())),
optional(125, "lower_bounds", MapType.ofRequired(126, 127,
IntegerType.get(), BinaryType.get())),
optional(128, "upper_bounds", MapType.ofRequired(129, 130,
IntegerType.get(), BinaryType.get())),
optional(131, "key_metadata", BinaryType.get()),
optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()))
// NEXT ID TO ASSIGN: 134
CONTENT,
FILE_PATH,
FILE_FORMAT,
required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
RECORD_COUNT,
FILE_SIZE,
COLUMN_SIZES,
VALUE_COUNTS,
NULL_VALUE_COUNTS,
LOWER_BOUNDS,
UPPER_BOUNDS,
KEY_METADATA,
SPLIT_OFFSETS
);
}

/**
* @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
*/
default FileContent content() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will DeleteFile extend this or will it be totally separate?

return FileContent.DATA;
}

/**
* @return fully qualified path to the file, suitable for constructing a Hadoop Path
*/
Expand Down Expand Up @@ -118,6 +146,13 @@ static StructType getType(StructType partitionType) {
*/
ByteBuffer keyMetadata();

/**
* @return List of recommended split locations, if applicable, null otherwise.
* When available, this information is used for planning scan tasks whose boundaries
* are determined by these offsets. The returned list must be sorted in ascending order.
*/
List<Long> splitOffsets();

/**
* Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use
* this method to copy data when collecting files from tasks.
Expand All @@ -133,11 +168,4 @@ static StructType getType(StructType partitionType) {
* @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
*/
DataFile copyWithoutStats();

/**
* @return List of recommended split locations, if applicable, null otherwise.
* When available, this information is used for planning scan tasks whose boundaries
* are determined by these offsets. The returned list must be sorted in ascending order.
*/
List<Long> splitOffsets();
}
39 changes: 39 additions & 0 deletions api/src/main/java/org/apache/iceberg/FileContent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* Content type stored in a file, one of DATA, POSITION_DELETES, or EQUALITY_DELETES.
*/
public enum FileContent {
DATA(0),
POSITION_DELETES(1),
EQUALITY_DELETES(2);

private final int id;

FileContent(int id) {
this.id = id;
}

public int id() {
return id;
}
}
38 changes: 38 additions & 0 deletions api/src/main/java/org/apache/iceberg/ManifestContent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* Content type stored in a manifest file, either DATA or DELETES.
*/
public enum ManifestContent {
DATA(0),
DELETES(1);

private final int id;

ManifestContent(int id) {
this.id = id;
}

public int id() {
return id;
}
}
53 changes: 35 additions & 18 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,41 @@
* Represents a manifest file that can be scanned to find data files in a table.
*/
public interface ManifestFile {
Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get());
Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get());
Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get());
Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get());
Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get());
Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get());
Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get());
Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get(), "Location URI with FS scheme");
Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get(), "Total file size in bytes");
Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get(), "Spec ID used to write");
Types.NestedField MANIFEST_CONTENT = optional(517, "content", Types.IntegerType.get(),
"Contents of the manifest: 0=data, 1=deletes");
Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get(),
"Sequence number when the manifest was added");
Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get(),
"Lowest sequence number in the manifest");
Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get(),
"Snapshot ID that added the manifest");
Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get(),
"Added entry count");
Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get(),
"Existing entry count");
Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get(),
"Deleted entry count");
Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get(),
"Added rows count");
Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get(),
"Existing rows count");
Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get(),
"Deleted rows count");
Types.StructType PARTITION_SUMMARY_TYPE = Types.StructType.of(
required(509, "contains_null", Types.BooleanType.get()),
optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
optional(511, "upper_bound", Types.BinaryType.get())
required(509, "contains_null", Types.BooleanType.get(), "True if any file has a null partition value"),
optional(510, "lower_bound", Types.BinaryType.get(), "Partition lower bound for all files"),
optional(511, "upper_bound", Types.BinaryType.get(), "Partition upper bound for all files")
);
Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE));
Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get());
Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get());
Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get());
Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get());
Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get());
// next ID to assign: 517
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
"Summary for each partition");
// next ID to assign: 518

Schema SCHEMA = new Schema(
PATH, LENGTH, SPEC_ID,
PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
Expand All @@ -77,6 +89,11 @@ static Schema schema() {
*/
int partitionSpecId();

/**
* @return the content stored in the manifest; either DATA or DELETES
*/
ManifestContent content();

/**
* @return the sequence number of the commit that added the manifest file
*/
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,24 @@ public boolean isOptional() {
return isOptional;
}

public NestedField asOptional() {
if (isOptional) {
return this;
}
return new NestedField(true, id, name, type, doc);
}

public boolean isRequired() {
return !isOptional;
}

public NestedField asRequired() {
if (!isOptional) {
return this;
}
return new NestedField(false, id, name, type, doc);
}

public int fieldId() {
return id;
}
Expand Down
10 changes: 9 additions & 1 deletion api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static class TestManifestFile implements ManifestFile {
private final String path;
private final long length;
private final int specId;
private final ManifestContent content;
private final Long snapshotId;
private final Integer addedFiles;
private final Long addedRows;
Expand All @@ -178,6 +179,7 @@ public TestManifestFile(String path, long length, int specId, Long snapshotId,
this.path = path;
this.length = length;
this.specId = specId;
this.content = ManifestContent.DATA;
this.snapshotId = snapshotId;
this.addedFiles = addedFiles;
this.addedRows = null;
Expand All @@ -188,13 +190,14 @@ public TestManifestFile(String path, long length, int specId, Long snapshotId,
this.partitions = partitions;
}

public TestManifestFile(String path, long length, int specId, Long snapshotId,
public TestManifestFile(String path, long length, int specId, ManifestContent content, Long snapshotId,
Integer addedFiles, Long addedRows, Integer existingFiles,
Long existingRows, Integer deletedFiles, Long deletedRows,
List<PartitionFieldSummary> partitions) {
this.path = path;
this.length = length;
this.specId = specId;
this.content = content;
this.snapshotId = snapshotId;
this.addedFiles = addedFiles;
this.addedRows = addedRows;
Expand All @@ -220,6 +223,11 @@ public int partitionSpecId() {
return specId;
}

@Override
public ManifestContent content() {
return content;
}

@Override
public long sequenceNumber() {
return 0;
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ protected long targetSplitSize(TableOperations ops) {
protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = AllDataFilesTable.allManifestFiles(ops.current().snapshots());
Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);

return CloseableIterable.transform(manifests, manifest -> new BaseFileScanTask(
DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, this one wouldn't probably work correctly before if snapshot id inheritance was enabled.

ops.io(), manifest, fileSchema, schemaString, specString, residuals));
}
}
}
Loading