Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
* @param <F> the concrete Java class of a ContentFile instance.
*/
public interface ContentFile<F> {
/**
* @return id of the partition spec used for partition metadata
*/
int specId();

/**
* @return type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
*/
Expand Down
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ public TestDataFile(String path, StructLike partition, long recordCount,
this.upperBounds = upperBounds;
}

@Override
public int specId() {
return 0;
}

@Override
public CharSequence path() {
return path;
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,13 @@ protected CloseableIterable<FileScanTask> planFiles(
if (snap.manifestListLocation() != null) {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
DataFile manifestListAsDataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(ops.io().newInputFile(snap.manifestListLocation()))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
DataFiles.fromManifestList(ops.io().newInputFile(snap.manifestListLocation())), null,
manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public PartitionData copy() {
private int[] fromProjectionPos;
private Types.StructType partitionType;

private int partitionSpecId = -1;
private FileContent content = FileContent.DATA;
private String filePath = null;
private FileFormat format = null;
Expand Down Expand Up @@ -108,11 +109,12 @@ public PartitionData copy() {
this.partitionData = new PartitionData(partitionType);
}

BaseFile(FileContent content, String filePath, FileFormat format,
BaseFile(int specId, FileContent content, String filePath, FileFormat format,
PartitionData partition, long fileSizeInBytes, long recordCount,
Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts, Map<Integer, Long> nullValueCounts,
Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer> upperBounds, List<Long> splitOffsets,
ByteBuffer keyMetadata) {
this.partitionSpecId = specId;
this.content = content;
this.filePath = filePath;
this.format = format;
Expand Down Expand Up @@ -145,6 +147,7 @@ public PartitionData copy() {
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
this.filePath = toCopy.filePath;
this.format = toCopy.format;
Expand Down Expand Up @@ -178,6 +181,15 @@ public PartitionData copy() {
BaseFile() {
}

@Override
public int specId() {
return partitionSpecId;
}

void setSpecId(int specId) {
this.partitionSpecId = specId;
}

protected abstract Schema getAvroSchema(Types.StructType partitionStruct);

@Override
Expand Down
85 changes: 10 additions & 75 deletions core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,88 +94,27 @@ public static PartitionData copy(PartitionSpec spec, StructLike partition) {
return copyPartitionData(spec, partition, null);
}

public static DataFile fromStat(FileStatus stat, long rowCount) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(location, format, rowCount, stat.getLen());
}

public static DataFile fromStat(FileStatus stat, PartitionData partition, long rowCount) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, rowCount, stat.getLen());
}

public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics,
EncryptionKeyMetadata keyMetadata, List<Long> splitOffsets) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, stat.getLen(), metrics, keyMetadata.buffer(), splitOffsets);
}

public static DataFile fromInputFile(InputFile file, PartitionData partition, long rowCount) {
if (file instanceof HadoopInputFile) {
return fromStat(((HadoopInputFile) file).getStat(), partition, rowCount);
}

String location = file.location();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, rowCount, file.getLength());
}

public static DataFile fromInputFile(InputFile file, long rowCount) {
if (file instanceof HadoopInputFile) {
return fromStat(((HadoopInputFile) file).getStat(), rowCount);
}

String location = file.location();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(location, format, rowCount, file.getLength());
}

public static DataFile fromEncryptedOutputFile(EncryptedOutputFile encryptedFile, PartitionData partition,
Metrics metrics, List<Long> splitOffsets) {
EncryptionKeyMetadata keyMetadata = encryptedFile.keyMetadata();
InputFile file = encryptedFile.encryptingOutputFile().toInputFile();
if (encryptedFile instanceof HadoopInputFile) {
return fromStat(((HadoopInputFile) file).getStat(), partition, metrics, keyMetadata, splitOffsets);
}

String location = file.location();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, file.getLength(), metrics, keyMetadata.buffer(), splitOffsets);
}

public static DataFile fromManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.addedFilesCount() != null && manifest.existingFilesCount() != null,
"Cannot create data file from manifest: data file counts are missing.");

return new GenericDataFile(manifest.path(),
FileFormat.AVRO,
manifest.addedFilesCount() + manifest.existingFilesCount(),
manifest.length());
}

public static DataFile fromManifestList(InputFile manifestList) {
return new GenericDataFile(manifestList.location(), FileFormat.AVRO, 1, manifestList.getLength());
return DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(manifest.path())
.withFormat(FileFormat.AVRO)
.withRecordCount(manifest.addedFilesCount() + manifest.existingFilesCount())
.withFileSizeInBytes(manifest.length())
.build();
}

public static Builder builder(PartitionSpec spec) {
return new Builder(spec);
}

static Builder builder() {
return new Builder();
}

public static class Builder {
private final PartitionSpec spec;
private final boolean isPartitioned;
private final int specId;
private PartitionData partitionData;
private String filePath = null;
private FileFormat format = null;
Expand All @@ -191,14 +130,9 @@ public static class Builder {
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;

public Builder() {
this.spec = null;
this.partitionData = null;
this.isPartitioned = false;
}

public Builder(PartitionSpec spec) {
this.spec = spec;
this.specId = spec.specId();
this.isPartitioned = spec.fields().size() > 0;
this.partitionData = isPartitioned ? newPartitionData(spec) : null;
}
Expand All @@ -221,6 +155,7 @@ public void clear() {

public Builder copy(DataFile toCopy) {
if (isPartitioned) {
Preconditions.checkState(specId == toCopy.specId(), "Cannot copy a DataFile with a different spec");
this.partitionData = copyPartitionData(spec, toCopy.partition(), partitionData);
}
this.filePath = toCopy.path().toString();
Expand Down Expand Up @@ -338,7 +273,7 @@ public DataFile build() {
Preconditions.checkArgument(recordCount >= 0, "Record count is required");

return new GenericDataFile(
filePath, format, isPartitioned ? partitionData.copy() : null,
specId, filePath, format, isPartitioned ? partitionData.copy() : null,
fileSizeInBytes, new Metrics(
recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds),
keyMetadata, splitOffsets);
Expand Down
15 changes: 4 additions & 11 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,10 @@ public static Builder deleteFileBuilder(PartitionSpec spec) {
return new Builder(spec);
}

static Builder deleteFileBuilder() {
return new Builder();
}

public static class Builder {
private final PartitionSpec spec;
private final boolean isPartitioned;
private final int specId;
private FileContent content = null;
private PartitionData partitionData;
private String filePath = null;
Expand All @@ -60,14 +57,9 @@ public static class Builder {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;

Builder() {
this.spec = null;
this.partitionData = null;
this.isPartitioned = false;
}

Builder(PartitionSpec spec) {
this.spec = spec;
this.specId = spec.specId();
this.isPartitioned = spec.fields().size() > 0;
this.partitionData = isPartitioned ? DataFiles.newPartitionData(spec) : null;
}
Expand All @@ -89,6 +81,7 @@ public void clear() {

public Builder copy(DeleteFile toCopy) {
if (isPartitioned) {
Preconditions.checkState(specId == toCopy.specId(), "Cannot copy a DeleteFile with a different spec");
this.partitionData = DataFiles.copyPartitionData(spec, toCopy.partition(), partitionData);
}
this.content = toCopy.content();
Expand Down Expand Up @@ -208,7 +201,7 @@ public DeleteFile build() {
Preconditions.checkArgument(recordCount >= 0, "Record count is required");

return new GenericDeleteFile(
content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null,
specId, content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null,
fileSizeInBytes, new Metrics(
recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds),
keyMetadata);
Expand Down
20 changes: 2 additions & 18 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,10 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
super(avroSchema);
}

GenericDataFile(String filePath, FileFormat format, long recordCount,
long fileSizeInBytes) {
this(filePath, format, null, recordCount, fileSizeInBytes);
}

GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long recordCount, long fileSizeInBytes) {
super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, recordCount,
null, null, null, null, null, null, null);
}

GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics, List<Long> splitOffsets) {
this(filePath, format, partition, fileSizeInBytes, metrics, null, splitOffsets);
}

GenericDataFile(String filePath, FileFormat format, PartitionData partition,
GenericDataFile(int specId, String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics,
ByteBuffer keyMetadata, List<Long> splitOffsets) {
super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
super(specId, FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, keyMetadata);
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
super(avroSchema);
}

GenericDeleteFile(FileContent content, String filePath, FileFormat format, PartitionData partition,
GenericDeleteFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata) {
super(content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
super(specId, content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
metrics.lowerBounds(), metrics.upperBounds(), null, keyMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,30 @@ static InheritableMetadata empty() {
static InheritableMetadata fromManifest(ManifestFile manifest) {
Preconditions.checkArgument(manifest.snapshotId() != null,
"Cannot read from ManifestFile with null (unassigned) snapshot ID");
return new BaseInheritableMetadata(manifest.snapshotId(), manifest.sequenceNumber());
return new BaseInheritableMetadata(manifest.partitionSpecId(), manifest.snapshotId(), manifest.sequenceNumber());
}

static InheritableMetadata forCopy(long snapshotId) {
return new CopyMetadata(snapshotId);
}

static class BaseInheritableMetadata implements InheritableMetadata {
private final int specId;
private final long snapshotId;
private final long sequenceNumber;

private BaseInheritableMetadata(long snapshotId, long sequenceNumber) {
private BaseInheritableMetadata(int specId, long snapshotId, long sequenceNumber) {
this.specId = specId;
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
}

@Override
public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
if (manifestEntry.file() instanceof BaseFile) {
BaseFile<?> file = (BaseFile<?>) manifestEntry.file();
file.setSpecId(specId);
}
if (manifestEntry.snapshotId() == null) {
manifestEntry.setSnapshotId(snapshotId);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/StaticDataTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row>
private final StructLike[] rows;

private StaticDataTask(InputFile metadata, StructLike[] rows) {
this.metadataFile = DataFiles.builder()
this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(metadata)
.withRecordCount(rows.length)
.withFormat(FileFormat.METADATA)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public org.apache.avro.Schema getSchema() {
return avroSchema;
}

@Override
public int specId() {
return wrapped.specId();
}

@Override
public FileContent content() {
return wrapped.content();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
}

@Override
public int specId() {
return wrapped.specId();
}

@Override
public FileContent content() {
return wrapped.content();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public class TestManifestWriterVersions {
private static final List<Long> OFFSETS = ImmutableList.of(4L);

private static final DataFile DATA_FILE = new GenericDataFile(
PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);
0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);

private static final DeleteFile DELETE_FILE = new GenericDeleteFile(
FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, null);
0, FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, null);

@Rule
public TemporaryFolder temp = new TemporaryFolder();
Expand Down
Loading