From dbb0c52dba461abdbe533271dce8189f370d7628 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 10 Apr 2020 13:32:24 -0700 Subject: [PATCH 1/6] Move manifest factory methods to ManifestFiles. --- .../apache/iceberg/BaseMetastoreCatalog.java | 2 +- .../apache/iceberg/BaseRewriteManifests.java | 4 +- .../org/apache/iceberg/DataFilesTable.java | 2 +- .../java/org/apache/iceberg/FastAppend.java | 2 +- .../org/apache/iceberg/ManifestFiles.java | 91 +++++++++++++++++++ .../org/apache/iceberg/ManifestGroup.java | 2 +- .../org/apache/iceberg/ManifestReader.java | 56 +----------- .../org/apache/iceberg/ManifestWriter.java | 12 +-- .../iceberg/MergingSnapshotProducer.java | 6 +- .../org/apache/iceberg/RemoveSnapshots.java | 4 +- .../org/apache/iceberg/SnapshotProducer.java | 2 +- .../org/apache/iceberg/TableTestBase.java | 8 +- .../apache/iceberg/TestManifestReader.java | 2 + .../apache/iceberg/TestRewriteManifests.java | 14 +-- .../spark/source/TestSparkDataWrite.java | 8 +- 15 files changed, 127 insertions(+), 88 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/ManifestFiles.java diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index e77dee20c48b..f56cc074c5e5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -261,7 +261,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { .executeWith(ThreadPools.getWorkerPool()) .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { - try (ManifestReader reader = ManifestReader.read(manifest, io)) { + try (ManifestReader reader = ManifestFiles.read(manifest, io)) { for (ManifestEntry entry : reader.entries()) { // intern the file path because the weak key map uses identity (==) instead of equals String path = entry.file().path().toString().intern(); diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 0eee04b68a6c..57ffc383aab9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -154,7 +154,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), specsById)) { OutputFile newFile = newManifestOutput(); return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES); } catch (IOException e) { @@ -237,7 +237,7 @@ private void performRewrite(List currentManifests) { keptManifests.add(manifest); } else { rewrittenManifests.add(manifest); - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { FilteredManifest filteredManifest = reader.select(Arrays.asList("*")); filteredManifest.liveEntries().forEach( entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId()) diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 77244583c6a6..53b6a97dd1c2 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -131,7 +131,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { @Override public CloseableIterable rows() { return CloseableIterable.transform( - ManifestReader.read(manifest, io).project(schema), + ManifestFiles.read(manifest, io).project(schema), file -> (GenericDataFile) file); } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 333883151787..fcda77f8938a 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -107,7 +107,7 @@ public FastAppend appendManifest(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { OutputFile newManifestPath = newManifestOutput(); return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java new file mode 100644 index 000000000000..1e74541b053f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +public class ManifestFiles { + private ManifestFiles() { + } + + /** + * Returns a new {@link ManifestReader} for a {@link ManifestFile}. + *

+ * Note: Callers should use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} to ensure + * the schema used by filters is the latest table schema. This should be used only when reading + * a manifest without filters. + * + * @param manifest a ManifestFile + * @param io a FileIO + * @return a manifest reader + */ + public static ManifestReader read(ManifestFile manifest, FileIO io) { + return read(manifest, io, null); + } + + /** + * Returns a new {@link ManifestReader} for a {@link ManifestFile}. + * + * @param manifest a {@link ManifestFile} + * @param io a {@link FileIO} + * @param specsById a Map from spec ID to partition spec + * @return a {@link ManifestReader} + */ + public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { + InputFile file = io.newInputFile(manifest.path()); + InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); + return new ManifestReader(file, specsById, inheritableMetadata); + } + + /** + * Create a new {@link ManifestWriter}. + *

+ * Manifests created by this writer have all entry snapshot IDs set to null. + * All entries will inherit the snapshot ID that will be assigned to the manifest on commit. + * + * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples + * @param outputFile the destination file location + * @return a manifest writer + */ + public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { + // always use a v1 writer for appended manifests because sequence number must be inherited + return write(1, spec, outputFile, null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @return a manifest writer + */ + static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + switch (formatVersion) { + case 1: + return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); + } + throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index b8dfed06a7cf..767fae734766 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -209,7 +209,7 @@ private Iterable> entries( Iterable> readers = Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestReader.read(manifest, io, specsById); + ManifestReader reader = ManifestFiles.read(manifest, io, specsById); FilteredManifest filtered = reader .filterRows(dataFilter) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 886eb7793cb4..60dcaf8a23f6 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -45,8 +45,7 @@ /** * Reader for manifest files. *

- * The preferable way to create readers is using {@link #read(ManifestFile, FileIO, Map)} as - * it allows entries to inherit manifest metadata such as snapshot id. + * Create readers using {@link ManifestFiles#read(ManifestFile, FileIO, Map)}. */ public class ManifestReader extends CloseableGroup implements Filterable { private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class); @@ -62,63 +61,18 @@ public class ManifestReader extends CloseableGroup implements Filterable - * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that + * Note: Callers should use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} to ensure that * manifest entries with partial metadata can inherit missing properties from the manifest metadata. - *

- * Note: Most callers should use {@link #read(InputFile, Map)} if all manifest entries - * contain full metadata and they want to ensure that the schema used by filters is the latest - * table schema. This should be used only when reading a manifest without filters. * * @param file an InputFile * @return a manifest reader + * @deprecated use {@link ManifestFiles#read(ManifestFile, FileIO, Map)}. */ + @Deprecated public static ManifestReader read(InputFile file) { return new ManifestReader(file, null, InheritableMetadataFactory.empty()); } - /** - * Returns a new {@link ManifestReader} for an {@link InputFile}. - *

- * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that - * manifest entries with partial metadata can inherit missing properties from the manifest metadata. - * - * @param file an InputFile - * @param specsById a Map from spec ID to partition spec - * @return a manifest reader - */ - public static ManifestReader read(InputFile file, Map specsById) { - return new ManifestReader(file, specsById, InheritableMetadataFactory.empty()); - } - - /** - * Returns a new {@link ManifestReader} for a {@link ManifestFile}. - *

- * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure - * the schema used by filters is the latest table schema. This should be used only when reading - * a manifest without filters. - * - * @param manifest a ManifestFile - * @param io a FileIO - * @return a manifest reader - */ - public static ManifestReader read(ManifestFile manifest, FileIO io) { - return read(manifest, io, null); - } - - /** - * Returns a new {@link ManifestReader} for a {@link ManifestFile}. - * - * @param manifest a ManifestFile - * @param io a FileIO - * @param specsById a Map from spec ID to partition spec - * @return a manifest reader - */ - public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { - InputFile file = io.newInputFile(manifest.path()); - InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); - return new ManifestReader(file, specsById, inheritableMetadata); - } - private final InputFile file; private final InheritableMetadata inheritableMetadata; private final Map metadata; @@ -129,7 +83,7 @@ public static ManifestReader read(ManifestFile manifest, FileIO io, Map cachedAdds = null; private List cachedDeletes = null; - private ManifestReader(InputFile file, Map specsById, + ManifestReader(InputFile file, Map specsById, InheritableMetadata inheritableMetadata) { this.file = file; this.inheritableMetadata = inheritableMetadata; diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index a6408ce12056..46be9cb0a6ab 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -94,15 +94,7 @@ static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, l * @return a manifest writer */ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { - // always use a v1 writer for appended manifests because sequence number must be inherited - return write(1, spec, outputFile, null); - } - - static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { - if (formatVersion == 1) { - return new V1Writer(spec, outputFile, snapshotId); - } - throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); + return ManifestFiles.write(spec, outputFile); } private final OutputFile file; @@ -239,7 +231,7 @@ private static FileAppender newAppender(FileFormat format, PartitionSpec } } - private static class V1Writer extends ManifestWriter { + static class V1Writer extends ManifestWriter { V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { super(spec, file, snapshotId); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 4537c232d183..3d4cfe5ff029 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -227,7 +227,7 @@ protected void add(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { OutputFile newManifestPath = newManifestOutput(); return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary); } catch (IOException e) { @@ -481,7 +481,7 @@ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, return manifest; } - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { // this is reused to compare file paths with the delete set CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap(""); @@ -655,7 +655,7 @@ private ManifestFile createManifest(int specId, List bin) throws I ManifestWriter writer = newManifestWriter(ops.current().spec()); try { for (ManifestFile manifest : bin) { - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { if (entry.status() == Status.DELETED) { // suppress deletes from previous snapshots. only files deleted by this snapshot diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 6ba28504c31d..f44e3725e978 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -337,7 +337,7 @@ private Set findFilesToDelete(Set manifestsToScan, Set LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted if (entry.status() == ManifestEntry.Status.DELETED && @@ -357,7 +357,7 @@ private Set findFilesToDelete(Set manifestsToScan, Set LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3bf8364194b0..4a72eb1d0e8d 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -333,7 +333,7 @@ protected long snapshotId() { } private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); int addedFiles = 0; long addedRows = 0L; diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 012d177d90b6..92719d862f8e 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -207,7 +207,7 @@ void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) { long id = snap.snapshotId(); Iterator newPaths = paths(newFiles).iterator(); - for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString()); Assert.assertEquals("File's snapshot ID should match", id, (long) entry.snapshotId()); @@ -239,7 +239,7 @@ List paths(DataFile... dataFiles) { static void validateManifest(ManifestFile manifest, Iterator ids, Iterator expectedFiles) { - for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); Assert.assertEquals("Path should match expected", @@ -255,7 +255,7 @@ static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, Iterator expectedStatuses) { - for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); @@ -283,6 +283,6 @@ static Iterator files(DataFile... files) { } static Iterator files(ManifestFile manifest) { - return ManifestReader.read(manifest, FILE_IO).iterator(); + return ManifestFiles.read(manifest, FILE_IO).iterator(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index d6d4c0c71b64..701d68741704 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -30,6 +30,7 @@ public class TestManifestReader extends TableTestBase { @Test + @SuppressWarnings("deprecation") public void testManifestReaderWithEmptyInheritableMetadata() throws IOException { ManifestFile manifest = writeManifest("manifest.avro", manifestEntry(Status.EXISTING, 1000L, FILE_A)); try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { @@ -41,6 +42,7 @@ public void testManifestReaderWithEmptyInheritableMetadata() throws IOException } @Test + @SuppressWarnings("deprecation") public void testInvalidUsage() throws IOException { ManifestFile manifest = writeManifest(FILE_A, FILE_B); try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index c1bd7a895b4a..1d85a2eb7af6 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -102,7 +102,7 @@ public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOExceptio // get the correct file order List files; List ids; - try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_A.path())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(manifestAppendId, fileAppendId); @@ -176,7 +176,7 @@ public void testReplaceManifestsConsolidate() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_A.path())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -218,7 +218,7 @@ public void testReplaceManifestsWithFilter() throws IOException { table.rewriteManifests() .clusterBy(file -> "file") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_A.path()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -232,7 +232,7 @@ public void testReplaceManifestsWithFilter() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_B.path())) { files = Arrays.asList(FILE_B, FILE_C); ids = Arrays.asList(appendIdB, appendIdC); @@ -302,7 +302,7 @@ public void testConcurrentRewriteManifest() throws IOException { table.rewriteManifests() .clusterBy(file -> "file") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_A.path()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -322,7 +322,7 @@ public void testConcurrentRewriteManifest() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_A.path())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -885,7 +885,7 @@ public void testManifestReplacementCombinedWithRewrite() throws IOException { .addManifest(newManifest) .clusterBy(dataFile -> "const-value") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { + try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_B.path()); } catch (IOException x) { throw new RuntimeIOException(x); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index f6bcb66a8458..eafe23e52dbb 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -28,7 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -121,7 +121,7 @@ public void testBasicWrite() throws IOException { Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(manifest, table.io())) { + for (DataFile file : ManifestFiles.read(manifest, table.io())) { // TODO: avro not support split if (!format.equals(FileFormat.AVRO)) { Assert.assertNotNull("Split offsets not present", file.splitOffsets()); @@ -316,7 +316,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws List files = Lists.newArrayList(); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(manifest, table.io())) { + for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } } @@ -365,7 +365,7 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio List files = Lists.newArrayList(); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(manifest, table.io())) { + for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } } From 689c3709389cf5f124f2b19670f27c82ac84c6b7 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 14 Apr 2020 18:38:30 -0700 Subject: [PATCH 2/6] Move copyManifest methods and add format version argument. --- .../apache/iceberg/BaseRewriteManifests.java | 3 +- .../java/org/apache/iceberg/FastAppend.java | 3 +- .../org/apache/iceberg/ManifestFiles.java | 52 +++++++++++++++++++ .../org/apache/iceberg/ManifestWriter.java | 48 ----------------- .../iceberg/MergingSnapshotProducer.java | 3 +- .../org/apache/iceberg/SnapshotProducer.java | 2 +- 6 files changed, 59 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 57ffc383aab9..7cadf91077a2 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -156,7 +156,8 @@ public RewriteManifests addManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), specsById)) { OutputFile newFile = newManifestOutput(); - return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES); + return ManifestFiles.copyManifest( + ops.current().formatVersion(), reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index fcda77f8938a..f0dda9a19b64 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -109,7 +109,8 @@ public FastAppend appendManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { OutputFile newManifestPath = newManifestOutput(); - return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder); + return ManifestFiles.copyAppendManifest( + ops.current().formatVersion(), reader, newManifestPath, snapshotId(), summaryBuilder); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 1e74541b053f..90b72cdac35a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -19,7 +19,12 @@ package org.apache.iceberg; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import java.io.IOException; import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -88,4 +93,51 @@ static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile ou } throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); } + + static ManifestFile copyAppendManifest(int formatVersion, ManifestReader reader, OutputFile outputFile, + long snapshotId, SnapshotSummary.Builder summaryBuilder) { + return copyManifest( + formatVersion, reader, outputFile, snapshotId, summaryBuilder, Sets.newHashSet(ManifestEntry.Status.ADDED)); + } + + static ManifestFile copyManifest(int formatVersion, ManifestReader reader, OutputFile outputFile, long snapshotId, + SnapshotSummary.Builder summaryBuilder, + Set allowedEntryStatuses) { + ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + boolean threw = true; + try { + for (ManifestEntry entry : reader.entries()) { + Preconditions.checkArgument( + allowedEntryStatuses.contains(entry.status()), + "Invalid manifest entry status: %s (allowed statuses: %s)", + entry.status(), allowedEntryStatuses); + switch (entry.status()) { + case ADDED: + summaryBuilder.addedFile(reader.spec(), entry.file()); + writer.add(entry); + break; + case EXISTING: + writer.existing(entry); + break; + case DELETED: + summaryBuilder.deletedFile(reader.spec(), entry.file()); + writer.delete(entry); + break; + } + } + + threw = false; + + } finally { + try { + writer.close(); + } catch (IOException e) { + if (!threw) { + throw new RuntimeIOException(e, "Failed to close manifest: %s", outputFile); + } + } + } + + return writer.toManifestFile(); + } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 46be9cb0a6ab..5a4568502d5a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -20,9 +20,7 @@ package org.apache.iceberg; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import java.io.IOException; -import java.util.Set; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; @@ -37,52 +35,6 @@ public abstract class ManifestWriter implements FileAppender { private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class); static final long UNASSIGNED_SEQ = -1L; - static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { - return copyManifest(reader, outputFile, snapshotId, summaryBuilder, Sets.newHashSet(ManifestEntry.Status.ADDED)); - } - - static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder, - Set allowedEntryStatuses) { - ManifestWriter writer = new V1Writer(reader.spec(), outputFile, snapshotId); - boolean threw = true; - try { - for (ManifestEntry entry : reader.entries()) { - Preconditions.checkArgument( - allowedEntryStatuses.contains(entry.status()), - "Invalid manifest entry status: %s (allowed statuses: %s)", - entry.status(), allowedEntryStatuses); - switch (entry.status()) { - case ADDED: - summaryBuilder.addedFile(reader.spec(), entry.file()); - writer.add(entry); - break; - case EXISTING: - writer.existing(entry); - break; - case DELETED: - summaryBuilder.deletedFile(reader.spec(), entry.file()); - writer.delete(entry); - break; - } - } - - threw = false; - - } finally { - try { - writer.close(); - } catch (IOException e) { - if (!threw) { - throw new RuntimeIOException(e, "Failed to close manifest: %s", outputFile); - } - } - } - - return writer.toManifestFile(); - } - /** * Create a new {@link ManifestWriter}. *

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 3d4cfe5ff029..deefbe85dad3 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -229,7 +229,8 @@ protected void add(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { OutputFile newManifestPath = newManifestOutput(); - return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary); + return ManifestFiles.copyAppendManifest( + ops.current().formatVersion(), reader, newManifestPath, snapshotId(), appendedManifestsSummary); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4a72eb1d0e8d..0e60ab947ce4 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -318,7 +318,7 @@ protected OutputFile newManifestOutput() { } protected ManifestWriter newManifestWriter(PartitionSpec spec) { - return ManifestWriter.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); } protected long snapshotId() { From 72bd99d21840eb0f9c9367200f15301840221b33 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 15:08:03 -0700 Subject: [PATCH 3/6] Fix tests added in #845. --- core/src/test/java/org/apache/iceberg/TestMergeAppend.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index d7b3a48b002a..d4dcff9c5421 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -730,7 +730,7 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { initialManifest, pending.manifests().get(1)); // field ids of manifest entries in two manifests with different specs of the same source field should be different - ManifestEntry entry = ManifestReader.read(pending.manifests().get(0), FILE_IO).entries().iterator().next(); + ManifestEntry entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next(); Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); Assert.assertEquals(1000, field.fieldId()); Assert.assertEquals("id_bucket", field.name()); @@ -738,7 +738,7 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { Assert.assertEquals(1001, field.fieldId()); Assert.assertEquals("data_bucket", field.name()); - entry = ManifestReader.read(pending.manifests().get(1), FILE_IO).entries().iterator().next(); + entry = ManifestFiles.read(pending.manifests().get(1), FILE_IO).entries().iterator().next(); field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0); Assert.assertEquals(1000, field.fieldId()); Assert.assertEquals("data_bucket", field.name()); From c565af50ae9bbaffd59c3acd39b0335c099ad97a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 15:44:40 -0700 Subject: [PATCH 4/6] Deprecate ManifestWriter.write method. --- core/src/main/java/org/apache/iceberg/ManifestReader.java | 2 +- core/src/main/java/org/apache/iceberg/ManifestWriter.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 60dcaf8a23f6..da2fb375efc4 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -66,7 +66,7 @@ public class ManifestReader extends CloseableGroup implements Filterable { * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples * @param outputFile the destination file location * @return a manifest writer + * @deprecated will be removed in 0.9.0; use {@link ManifestFiles#write(PartitionSpec, OutputFile)} instead. */ + @Deprecated public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { return ManifestFiles.write(spec, outputFile); } From 3ebeb7bce6e7201292f3acdb924c61b8cfc56de3 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 15:54:17 -0700 Subject: [PATCH 5/6] Add old ManifestReader.read method and remove deprecated uses. --- .../org/apache/iceberg/ManifestReader.java | 25 ++++++++++++++++--- .../org/apache/iceberg/TableTestBase.java | 6 ++--- .../apache/iceberg/TestManifestReader.java | 2 ++ .../org/apache/iceberg/TestTransaction.java | 2 +- .../apache/iceberg/spark/SparkTableUtil.scala | 4 +-- .../TestManifestFileSerialization.java | 2 +- .../source/TestForwardCompatibility.java | 3 ++- 7 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index da2fb375efc4..9ea3979b6b08 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -70,7 +71,20 @@ public class ManifestReader extends CloseableGroup implements Filterable specLookup) { + return new ManifestReader(file, specLookup, InheritableMetadataFactory.empty()); } private final InputFile file; @@ -84,6 +98,11 @@ public static ManifestReader read(InputFile file) { private List cachedDeletes = null; ManifestReader(InputFile file, Map specsById, + InheritableMetadata inheritableMetadata) { + this(file, specsById::get, inheritableMetadata); + } + + private ManifestReader(InputFile file, Function specLookup, InheritableMetadata inheritableMetadata) { this.file = file; this.inheritableMetadata = inheritableMetadata; @@ -104,8 +123,8 @@ public static ManifestReader read(InputFile file) { specId = Integer.parseInt(specProperty); } - if (specsById != null) { - this.spec = specsById.get(specId); + if (specLookup != null) { + this.spec = specLookup.apply(specId); } else { Schema schema = SchemaParser.fromJson(metadata.get("schema")); this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 92719d862f8e..4398c2420b28 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -130,7 +130,7 @@ ManifestFile writeManifest(DataFile... files) throws IOException { Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile); + ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile); try { for (DataFile file : files) { writer.add(file); @@ -147,7 +147,7 @@ ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOE Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile); + ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile); try { for (ManifestEntry entry : entries) { writer.addEntry(entry); @@ -164,7 +164,7 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile); + ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile); try { for (DataFile file : files) { writer.add(file); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 701d68741704..76ede625ffa0 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -54,6 +54,7 @@ public void testInvalidUsage() throws IOException { } @Test + @SuppressWarnings("deprecation") public void testManifestReaderWithPartitionMetadata() throws IOException { ManifestFile manifest = writeManifest("manifest.avro", manifestEntry(Status.EXISTING, 123L, FILE_A)); try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { @@ -69,6 +70,7 @@ public void testManifestReaderWithPartitionMetadata() throws IOException { } @Test + @SuppressWarnings("deprecation") public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IOException { PartitionSpec spec = PartitionSpec.builderFor(table.schema()) .bucket("id", 8) diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 13a298b37a16..8f0fedbf018a 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -487,7 +487,7 @@ public void testTransactionRetryAndAppendManifests() throws Exception { // create a manifest append OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro"); - ManifestWriter writer = ManifestWriter.write(table.spec(), manifestLocation); + ManifestWriter writer = ManifestFiles.write(table.spec(), manifestLocation); try { writer.add(FILE_D); } finally { diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala index eaf405a00fb1..eb4a72685256 100644 --- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala +++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala @@ -22,7 +22,7 @@ package org.apache.iceberg.spark import com.google.common.collect.Maps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter} +import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestFiles} import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table, TableProperties} import org.apache.iceberg.exceptions.NoSuchTableException import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, SerializableConfiguration} @@ -324,7 +324,7 @@ object SparkTableUtil { val ctx = TaskContext.get() val location = new Path(basePath, s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest") val outputFile = io.newOutputFile(FileFormat.AVRO.addExtension(location.toString)) - val writer = ManifestWriter.write(spec, outputFile) + val writer = ManifestFiles.write(spec, outputFile) try { files.foreach(writer.add) } finally { diff --git a/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java index df1a7ea20126..00cec599dcf3 100644 --- a/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestManifestFileSerialization.java @@ -155,7 +155,7 @@ private ManifestFile writeManifest(DataFile... files) throws IOException { Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = FILE_IO.newOutputFile(manifestFile.getCanonicalPath()); - ManifestWriter writer = ManifestWriter.write(SPEC, outputFile); + ManifestWriter writer = ManifestFiles.write(SPEC, outputFile); try { for (DataFile file : files) { writer.add(file); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java index 23d6dcb42d16..23e7926c4580 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java @@ -35,6 +35,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -184,7 +185,7 @@ public void testSparkCanReadUnknownTransform() throws IOException { .build(); OutputFile manifestFile = localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); - ManifestWriter manifestWriter = ManifestWriter.write(FAKE_SPEC, manifestFile); + ManifestWriter manifestWriter = ManifestFiles.write(FAKE_SPEC, manifestFile); try { manifestWriter.add(file); } finally { From ff8e18238534ad350b8f929d4e9bbfda9f5f7756 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Apr 2020 16:34:11 -0700 Subject: [PATCH 6/6] Fix tests. --- core/src/main/java/org/apache/iceberg/ManifestReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 9ea3979b6b08..367c2c38ca4b 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -99,7 +99,7 @@ public static ManifestReader read(InputFile file, Function specsById, InheritableMetadata inheritableMetadata) { - this(file, specsById::get, inheritableMetadata); + this(file, specsById != null ? specsById::get : null, inheritableMetadata); } private ManifestReader(InputFile file, Function specLookup,