diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java index f7397511e02f..e85825afb2d7 100644 --- a/api/src/main/java/com/netflix/iceberg/Files.java +++ b/api/src/main/java/com/netflix/iceberg/Files.java @@ -29,6 +29,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Paths; public class Files { @@ -37,7 +38,7 @@ public static OutputFile localOutput(File file) { } public static OutputFile localOutput(String file) { - return localOutput(new File(file)); + return localOutput(Paths.get(file).toAbsolutePath().toFile()); } private static class LocalOutputFile implements OutputFile { @@ -53,6 +54,13 @@ public PositionOutputStream create() { throw new AlreadyExistsException("File already exists: %s", file); } + if (!file.getParentFile().isDirectory() && !file.getParentFile().mkdirs()) { + throw new RuntimeIOException( + String.format( + "Failed to create the file's directory at %s.", + file.getParentFile().getAbsolutePath())); + } + try { return new PositionFileOutputStream(new RandomAccessFile(file, "rw")); } catch (FileNotFoundException e) { diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java index 81452d45311f..79a1337e33ab 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java @@ -20,10 +20,8 @@ package com.netflix.iceberg; import com.google.common.base.Objects; -import com.google.common.base.Preconditions; import com.netflix.iceberg.exceptions.RuntimeIOException; -import com.netflix.iceberg.hadoop.HadoopOutputFile; -import com.netflix.iceberg.io.InputFile; +import com.netflix.iceberg.hadoop.HadoopFileIO; import com.netflix.iceberg.io.OutputFile; import com.netflix.iceberg.util.Tasks; import org.apache.hadoop.conf.Configuration; @@ -53,6 +51,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations { private static final String HIVE_LOCATION_FOLDER_NAME = "empty"; private final Configuration conf; + private final FileIO fileIo; private TableMetadata currentMetadata = null; private String currentMetadataLocation = null; @@ -62,6 +61,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations { protected BaseMetastoreTableOperations(Configuration conf) { this.conf = conf; + this.fileIo = new HadoopFileIO(conf); } @Override @@ -88,22 +88,18 @@ public String hiveTableLocation() { return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME); } - public String dataLocation() { - return String.format("%s/%s", baseLocation, DATA_FOLDER_NAME); - } - protected String writeNewMetadata(TableMetadata metadata, int version) { if (baseLocation == null) { baseLocation = metadata.location(); } - String newFilename = newTableMetadataFilename(baseLocation, version); - OutputFile newMetadataLocation = HadoopOutputFile.fromPath(new Path(newFilename), conf); + String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version); + OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath); // write the new metadata TableMetadataParser.write(metadata, newMetadataLocation); - return newFilename; + return newTableMetadataFilePath; } protected void refreshFromMetadataLocation(String newLocation) { @@ -129,24 +125,13 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) { } @Override - public InputFile newInputFile(String path) { - return fromLocation(path, conf); + public String metadataFileLocation(String fileName) { + return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName); } @Override - public OutputFile newMetadataFile(String filename) { - return HadoopOutputFile.fromPath( - new Path(newMetadataLocation(baseLocation, filename)), conf); - } - - @Override - public void deleteFile(String file) { - Path path = new Path(file); - try { - getFS(path, conf).delete(path, false /* should be a file, not recursive */ ); - } catch (IOException e) { - throw new RuntimeIOException(e); - } + public FileIO io() { + return fileIo; } @Override @@ -154,7 +139,7 @@ public long newSnapshotId() { return System.currentTimeMillis(); } - private String newTableMetadataFilename(String baseLocation, int newVersion) { + private String newTableMetadataFilePath(String baseLocation, int newVersion) { return String.format("%s/%s/%05d-%s%s", baseLocation, METADATA_FOLDER_NAME, @@ -163,22 +148,6 @@ private String newTableMetadataFilename(String baseLocation, int newVersion) { getFileExtension(this.conf)); } - private static String newMetadataLocation(String baseLocation, String filename) { - return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, filename); - } - - private static String parseBaseLocation(String metadataLocation) { - int lastSlash = metadataLocation.lastIndexOf('/'); - int secondToLastSlash = metadataLocation.lastIndexOf('/', lastSlash); - - // verify that the metadata file was contained in a "metadata" folder - String parentFolderName = metadataLocation.substring(secondToLastSlash + 1, lastSlash); - Preconditions.checkArgument(METADATA_FOLDER_NAME.equals(parentFolderName), - "Invalid metadata location, not in metadata/ folder: %s", metadataLocation); - - return metadataLocation.substring(0, secondToLastSlash); - } - private static int parseVersion(String metadataLocation) { int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 int versionEnd = metadataLocation.indexOf('-', versionStart); diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java index 945ddbbb59b6..36a873a2b06c 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java +++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java @@ -50,7 +50,7 @@ class BaseSnapshot implements Snapshot { String... manifestFiles) { this(ops, snapshotId, null, System.currentTimeMillis(), Lists.transform(Arrays.asList(manifestFiles), - path -> new GenericManifestFile(ops.newInputFile(path), 0))); + path -> new GenericManifestFile(ops.io().newInputFile(path), 0))); } BaseSnapshot(TableOperations ops, @@ -139,7 +139,7 @@ private void cacheChanges() { // accumulate adds and deletes from all manifests. // because manifests can be reused in newer snapshots, filter the changes by snapshot id. for (String manifest : Iterables.transform(manifests, ManifestFile::path)) { - try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) { + try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) { for (ManifestEntry add : reader.addedFiles()) { if (add.snapshotId() == snapshotId) { adds.add(add.file().copy()); diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java index ad207807c6e0..8915461b0e0c 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java @@ -34,7 +34,6 @@ import com.netflix.iceberg.expressions.Expression; import com.netflix.iceberg.expressions.Expressions; import com.netflix.iceberg.expressions.InclusiveManifestEvaluator; -import com.netflix.iceberg.expressions.Projections; import com.netflix.iceberg.expressions.ResidualEvaluator; import com.netflix.iceberg.io.CloseableIterable; import com.netflix.iceberg.types.TypeUtil; @@ -177,7 +176,7 @@ public CloseableIterable planFiles() { Iterable> readers = Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path())); + ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path())); toClose.add(reader); String schemaString = SchemaParser.toJson(reader.spec().schema()); String specString = PartitionSpecParser.toJson(reader.spec()); diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java index a860117ca54b..1a56b7e16ee7 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java @@ -23,8 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.exceptions.CommitFailedException; -import com.netflix.iceberg.io.InputFile; -import com.netflix.iceberg.io.OutputFile; import com.netflix.iceberg.util.Tasks; import java.util.List; import java.util.Map; @@ -263,18 +261,13 @@ public void commit(TableMetadata base, TableMetadata metadata) { } @Override - public InputFile newInputFile(String path) { - return ops.newInputFile(path); + public FileIO io() { + return ops.io(); } @Override - public OutputFile newMetadataFile(String filename) { - return ops.newMetadataFile(filename); - } - - @Override - public void deleteFile(String path) { - ops.deleteFile(path); + public String metadataFileLocation(String fileName) { + return ops.metadataFileLocation(fileName); } @Override diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/core/src/main/java/com/netflix/iceberg/FileIO.java new file mode 100644 index 000000000000..fdba7afba4d8 --- /dev/null +++ b/core/src/main/java/com/netflix/iceberg/FileIO.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg; + +import com.netflix.iceberg.io.InputFile; +import com.netflix.iceberg.io.OutputFile; + +import java.io.Serializable; + +/** + * Pluggable module for reading, writing, and deleting files. + *

+ * Both table metadata files and data files can be written and read by this module. Implementations + * must be serializable because various clients of Spark tables may initialize this once and pass + * it off to a separate module that would then interact with the streams. + */ +public interface FileIO extends Serializable { + + /** + * Get a {@link InputFile} instance to read bytes from the file at the given path. + */ + InputFile newInputFile(String path); + + /** + * Get a {@link OutputFile} instance to write bytes to the file at the given path. + */ + OutputFile newOutputFile(String path); + + /** + * Delete the file at the given path. + */ + void deleteFile(String path); +} diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java index 19d993f919a5..d05ceca35bcc 100644 --- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java +++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java @@ -107,7 +107,7 @@ public CloseableIterable entries() { Iterable> readers = Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path())); + ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path())); FilteredManifest filtered = reader.filterRows(dataFilter).select(columns); toClose.add(reader); return Iterables.filter( diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java index 8878d4c0ec4b..156f9eaceed7 100644 --- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java +++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java @@ -316,7 +316,7 @@ private ManifestFile filterManifest(Expression deleteExpression, return manifest; } - try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) { Expression inclusiveExpr = Projections .inclusive(reader.spec()) .project(deleteExpression); @@ -463,7 +463,7 @@ private ManifestFile createManifest(int specId, List bin) throws I try { for (ManifestFile manifest : bin) { - try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) { for (ManifestEntry entry : reader.entries()) { if (entry.status() == Status.DELETED) { // suppress deletes from previous snapshots. only files deleted by this snapshot diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java index 9ce69810b8af..541cc5fee093 100644 --- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java @@ -50,7 +50,7 @@ class RemoveSnapshots implements ExpireSnapshots { private final Consumer defaultDelete = new Consumer() { @Override public void accept(String file) { - ops.deleteFile(file); + ops.io().deleteFile(file); } }; @@ -164,7 +164,7 @@ public void commit() { ).run(manifest -> { // even if the manifest is still used, it may contain files that can be deleted // TODO: eliminate manifests with no deletes without scanning - try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted if (entry.status() == ManifestEntry.Status.DELETED && diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java index a5ce08c527ba..d73da8aae5ea 100644 --- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java +++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java @@ -22,12 +22,9 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.util.JsonUtil; -import com.netflix.iceberg.util.Tasks; -import com.netflix.iceberg.util.ThreadPools; import java.io.IOException; import java.io.StringWriter; import java.util.List; @@ -92,13 +89,13 @@ static Snapshot fromJson(TableOperations ops, JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); - return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList)); + return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList)); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), - location -> new GenericManifestFile(ops.newInputFile(location), 0)); + location -> new GenericManifestFile(ops.io().newInputFile(location), 0)); return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests); } } diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java index 54c0483d2fa9..ce9d59c7501d 100644 --- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java +++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java @@ -132,7 +132,7 @@ public Snapshot apply() { return new BaseSnapshot(ops, snapshotId(), parentSnapshotId, System.currentTimeMillis(), - ops.newInputFile(manifestList.location())); + ops.io().newInputFile(manifestList.location())); } else { return new BaseSnapshot(ops, @@ -188,16 +188,17 @@ protected void cleanAll() { } protected void deleteFile(String path) { - ops.deleteFile(path); + ops.io().deleteFile(path); } protected OutputFile manifestListPath() { - return ops.newMetadataFile(FileFormat.AVRO.addExtension( - String.format("snap-%d-%s", snapshotId(), commitUUID))); + return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension( + String.format("snap-%d-%s", snapshotId(), commitUUID)))); } protected OutputFile manifestPath(int i) { - return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i)); + return ops.io().newOutputFile( + ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + i))); } protected long snapshotId() { @@ -208,7 +209,7 @@ protected long snapshotId() { } private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); int addedFiles = 0; int existingFiles = 0; diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java index a0c94b8fb3c9..e9d4388953b9 100644 --- a/core/src/main/java/com/netflix/iceberg/TableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java @@ -19,7 +19,6 @@ package com.netflix.iceberg; -import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; /** @@ -56,27 +55,18 @@ public interface TableOperations { void commit(TableMetadata base, TableMetadata metadata); /** - * Create a new {@link InputFile} for a path. - * - * @param path a string file path - * @return an InputFile instance for the path + * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files */ - InputFile newInputFile(String path); + FileIO io(); /** - * Create a new {@link OutputFile} in the table's metadata store. - * - * @param filename a string file name, not a full path - * @return an OutputFile instance for the path - */ - OutputFile newMetadataFile(String filename); - - /** - * Delete a file. - * - * @param path path to the file + * Given the name of a metadata file, obtain the full path of that file using an appropriate base + * location of the implementation's choosing. + *

+ * The file may not exist yet, in which case the path should be returned as if it were to be created + * by e.g. {@link FileIO#newOutputFile(String)}. */ - void deleteFile(String path); + String metadataFileLocation(String fileName); /** * Create a new ID for a Snapshot diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java new file mode 100644 index 000000000000..586942cc24ce --- /dev/null +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java @@ -0,0 +1,43 @@ +package com.netflix.iceberg.hadoop; + +import com.netflix.iceberg.FileIO; +import com.netflix.iceberg.exceptions.RuntimeIOException; +import com.netflix.iceberg.io.InputFile; +import com.netflix.iceberg.io.OutputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public class HadoopFileIO implements FileIO { + + private final SerializableConfiguration hadoopConf; + + public HadoopFileIO(Configuration hadoopConf) { + this.hadoopConf = new SerializableConfiguration(hadoopConf); + } + + @Override + public InputFile newInputFile(String path) { + return HadoopInputFile.fromLocation(path, hadoopConf.get()); + } + + @Override + public OutputFile newOutputFile(String path) { + return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get()); + } + + @Override + public void deleteFile(String path) { + Path toDelete = new Path(path); + FileSystem fs = Util.getFS(toDelete, hadoopConf.get()); + try { + fs.delete(toDelete, false /* not recursive */); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to delete file: %s", path); + } + } +} diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 875643e3f0b7..d953056e9a8b 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -19,14 +19,13 @@ package com.netflix.iceberg.hadoop; +import com.netflix.iceberg.FileIO; import com.netflix.iceberg.TableMetadata; import com.netflix.iceberg.TableMetadataParser; import com.netflix.iceberg.TableOperations; import com.netflix.iceberg.exceptions.CommitFailedException; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.exceptions.ValidationException; -import com.netflix.iceberg.io.InputFile; -import com.netflix.iceberg.io.OutputFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -53,6 +52,7 @@ public class HadoopTableOperations implements TableOperations { private TableMetadata currentMetadata = null; private Integer version = null; private boolean shouldRefresh = true; + private HadoopFileIO defaultFileIo = null; protected HadoopTableOperations(Path location, Configuration conf) { this.conf = conf; @@ -91,7 +91,7 @@ public TableMetadata refresh() { } this.version = ver; this.currentMetadata = TableMetadataParser.read(this, - HadoopInputFile.fromPath(metadataFile, conf)); + io().newInputFile(metadataFile.toString())); this.shouldRefresh = false; return currentMetadata; } @@ -108,7 +108,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { } Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf)); - TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf)); + TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString())); int nextVersion = (version != null ? version : 0) + 1; Path finalMetadataFile = metadataFile(nextVersion); @@ -142,24 +142,16 @@ public void commit(TableMetadata base, TableMetadata metadata) { } @Override - public InputFile newInputFile(String path) { - return HadoopInputFile.fromPath(new Path(path), conf); - } - - @Override - public OutputFile newMetadataFile(String filename) { - return HadoopOutputFile.fromPath(metadataPath(filename), conf); + public FileIO io() { + if (defaultFileIo == null) { + defaultFileIo = new HadoopFileIO(conf); + } + return defaultFileIo; } @Override - public void deleteFile(String path) { - Path toDelete = new Path(path); - FileSystem fs = getFS(toDelete, conf); - try { - fs.delete(toDelete, false /* not recursive */ ); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to delete file: %s", path); - } + public String metadataFileLocation(String fileName) { + return metadataPath(fileName).toString(); } @Override @@ -194,7 +186,7 @@ private void writeVersionHint(int version) { private int readVersionHint() { Path versionHintFile = versionHintFile(); try { - FileSystem fs = versionHintFile.getFileSystem(conf); + FileSystem fs = Util.getFS(versionHintFile, conf); if (!fs.exists(versionHintFile)) { return 0; } diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java b/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java new file mode 100644 index 000000000000..30c756378d4e --- /dev/null +++ b/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg.hadoop; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; + +/** + * Wraps a {@link Configuration} object in a {@link Serializable} layer. + */ +public class SerializableConfiguration implements Serializable { + + private transient Configuration hadoopConf; + + public SerializableConfiguration(Configuration hadoopCOnf) { + this.hadoopConf = hadoopCOnf; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + hadoopConf.write(out); + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + in.defaultReadObject(); + hadoopConf = new Configuration(false); + hadoopConf.readFields(in); + } + + public Configuration get() { + return hadoopConf; + } +} diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java index 27a01fc49f3c..1508ee804847 100644 --- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java +++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java @@ -19,21 +19,22 @@ package com.netflix.iceberg; +import com.google.common.collect.Maps; import com.netflix.iceberg.exceptions.RuntimeIOException; -import com.netflix.iceberg.io.InputFile; -import com.netflix.iceberg.io.OutputFile; +import java.util.Map; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.io.IOException; -import static com.netflix.iceberg.Files.localInput; - class LocalTableOperations implements TableOperations { private final TemporaryFolder temp; + private final FileIO io; + + private final Map createdMetadataFilePaths = Maps.newHashMap(); LocalTableOperations(TemporaryFolder temp) { this.temp = temp; + this.io = new TestTables.LocalFileIO(); } @Override @@ -52,25 +53,19 @@ public void commit(TableMetadata base, TableMetadata metadata) { } @Override - public InputFile newInputFile(String path) { - return localInput(path); - } - - @Override - public OutputFile newMetadataFile(String filename) { - try { - File metadataFile = temp.newFile(filename); - metadataFile.delete(); - metadataFile.deleteOnExit(); - return Files.localOutput(metadataFile); - } catch (IOException e) { - throw new RuntimeIOException(e); - } + public FileIO io() { + return io; } @Override - public void deleteFile(String path) { - new File(path).delete(); + public String metadataFileLocation(String fileName) { + return createdMetadataFilePaths.computeIfAbsent(fileName, name -> { + try { + return temp.newFile(name).getAbsolutePath(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + }); } @Override diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java index fcb9bfcecc6e..e6aea02fd008 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTables.java +++ b/core/src/test/java/com/netflix/iceberg/TestTables.java @@ -27,6 +27,7 @@ import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; import java.io.File; +import java.io.IOException; import java.util.Map; import static com.netflix.iceberg.TableMetadata.newTableMetadata; @@ -173,14 +174,34 @@ public void commit(TableMetadata base, TableMetadata metadata) { } } + @Override + public FileIO io() { + return new LocalFileIO(); + } + + @Override + public String metadataFileLocation(String fileName) { + return new File(metadata, fileName).getAbsolutePath(); + } + + @Override + public long newSnapshotId() { + long nextSnapshotId = lastSnapshotId + 1; + this.lastSnapshotId = nextSnapshotId; + return nextSnapshotId; + } + } + + static class LocalFileIO implements FileIO { + @Override public InputFile newInputFile(String path) { return Files.localInput(path); } @Override - public OutputFile newMetadataFile(String filename) { - return Files.localOutput(new File(metadata, filename)); + public OutputFile newOutputFile(String path) { + return Files.localOutput(path); } @Override @@ -189,12 +210,5 @@ public void deleteFile(String path) { throw new RuntimeIOException("Failed to delete file: " + path); } } - - @Override - public long newSnapshotId() { - long nextSnapshotId = lastSnapshotId + 1; - this.lastSnapshotId = nextSnapshotId; - return nextSnapshotId; - } } } diff --git a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java index a506b4e4270a..4f59556ebb64 100644 --- a/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java @@ -79,7 +79,7 @@ public Iterator iterator() { } private CloseableIterable open(FileScanTask task) { - InputFile input = ops.newInputFile(task.file().path().toString()); + InputFile input = ops.io().newInputFile(task.file().path().toString()); // TODO: join to partition data from the manifest file switch (task.file().format()) { diff --git a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java index 38a0cb0defe5..0199e7fc1269 100644 --- a/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java +++ b/hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java @@ -161,7 +161,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { } finally { if (threw) { // if anything went wrong, clean up the uncommitted metadata file - deleteFile(newMetadataLocation); + io().deleteFile(newMetadataLocation); } unlock(lockId); } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java index a7ff513951ef..90b6dc8ec6ea 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.netflix.iceberg.BaseTable; +import com.netflix.iceberg.FileIO; import com.netflix.iceberg.Files; import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; @@ -33,6 +34,7 @@ import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; import java.io.File; +import java.io.IOException; import java.util.Map; // TODO: Use the copy of this from core. @@ -153,16 +155,34 @@ public void commit(TableMetadata base, TableMetadata metadata) { } } + @Override + public FileIO io() { + return new LocalFileIO(); + } + + @Override + public String metadataFileLocation(String fileName) { + return new File(new File(current.location(), "metadata"), fileName).getAbsolutePath(); + } + + @Override + public long newSnapshotId() { + long nextSnapshotId = lastSnapshotId + 1; + this.lastSnapshotId = nextSnapshotId; + return nextSnapshotId; + } + } + + static class LocalFileIO implements FileIO { + @Override public InputFile newInputFile(String path) { return Files.localInput(path); } @Override - public OutputFile newMetadataFile(String filename) { - File metadata = new File(current.location(), "metadata"); - metadata.mkdirs(); - return Files.localOutput(new File(metadata, filename)); + public OutputFile newOutputFile(String path) { + return Files.localOutput(new File(path)); } @Override @@ -171,12 +191,5 @@ public void deleteFile(String path) { throw new RuntimeIOException("Failed to delete file: " + path); } } - - @Override - public long newSnapshotId() { - long nextSnapshotId = lastSnapshotId + 1; - this.lastSnapshotId = nextSnapshotId; - return nextSnapshotId; - } } }