diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java index ab196a619252..2456a5a2467d 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java @@ -76,6 +76,10 @@ public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) { } } + static HadoopInputFile fromFsPath(FileSystem fs, Path path, Configuration conf) { + return new HadoopInputFile(fs, path, conf); + } + private HadoopInputFile(FileSystem fs, Path path, Configuration conf) { this.fs = fs; this.path = path; diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java index e01b7e9b8c39..163971fef4d2 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java @@ -31,20 +31,25 @@ */ public class HadoopOutputFile implements OutputFile { public static OutputFile fromPath(Path path, Configuration conf) { - return new HadoopOutputFile(path, conf); + return new HadoopOutputFile(Util.getFS(path, conf), path, conf); + } + + static OutputFile fromFsPath(FileSystem fs, Path path, Configuration conf) { + return new HadoopOutputFile(fs, path, conf); } private final Path path; private final Configuration conf; + private final FileSystem fs; - private HadoopOutputFile(Path path, Configuration conf) { + private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) { this.path = path; this.conf = conf; + this.fs = fs; } @Override public PositionOutputStream create() { - FileSystem fs = Util.getFS(path, conf); try { return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); } catch (FileAlreadyExistsException e) { diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 0bd6477c5d9c..7a6318a1608d 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -47,6 +47,7 @@ class HadoopTableOperations implements TableOperations { private final Configuration conf; private final Path location; + private final FileSystem metadataFs; private TableMetadata currentMetadata = null; private Integer version = null; private boolean shouldRefresh = true; @@ -54,6 +55,7 @@ class HadoopTableOperations implements TableOperations { HadoopTableOperations(Path location, Configuration conf) { this.conf = conf; this.location = location; + this.metadataFs = Util.getFS(location, conf); } public TableMetadata current() { @@ -67,10 +69,9 @@ public TableMetadata current() { public TableMetadata refresh() { int ver = version != null ? version : readVersionHint(); Path metadataFile = metadataFile(ver); - FileSystem fs = Util.getFS(metadataFile, conf); try { // don't check if the file exists if version is non-null because it was already checked - if (version == null && !fs.exists(metadataFile)) { + if (version == null && !metadataFs.exists(metadataFile)) { if (ver == 0) { // no v0 metadata means the table doesn't exist yet return null; @@ -78,7 +79,7 @@ public TableMetadata refresh() { throw new ValidationException("Metadata file is missing: %s", metadataFile); } - while (fs.exists(metadataFile(ver + 1))) { + while (metadataFs.exists(metadataFile(ver + 1))) { ver += 1; metadataFile = metadataFile(ver); } @@ -88,7 +89,7 @@ public TableMetadata refresh() { } this.version = ver; this.currentMetadata = TableMetadataParser.read(this, - HadoopInputFile.fromPath(metadataFile, conf)); + HadoopInputFile.fromFsPath(metadataFs, metadataFile, conf)); this.shouldRefresh = false; return currentMetadata; } @@ -105,14 +106,13 @@ public void commit(TableMetadata base, TableMetadata metadata) { } Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf)); - TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf)); + TableMetadataParser.write(metadata, HadoopOutputFile.fromFsPath(metadataFs, tempMetadataFile, conf)); int nextVersion = (version != null ? version : 0) + 1; Path finalMetadataFile = metadataFile(nextVersion); - FileSystem fs = Util.getFS(tempMetadataFile, conf); try { - if (fs.exists(finalMetadataFile)) { + if (metadataFs.exists(finalMetadataFile)) { throw new CommitFailedException( "Version %d already exists: %s", nextVersion, finalMetadataFile); } @@ -123,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { try { // this rename operation is the atomic commit operation - if (!fs.rename(tempMetadataFile, finalMetadataFile)) { + if (!metadataFs.rename(tempMetadataFile, finalMetadataFile)) { throw new CommitFailedException( "Failed to commit changes using rename: %s", finalMetadataFile); } @@ -140,20 +140,19 @@ public void commit(TableMetadata base, TableMetadata metadata) { @Override public InputFile newInputFile(String path) { - return HadoopInputFile.fromPath(new Path(path), conf); + return HadoopInputFile.fromFsPath(metadataFs, new Path(path), conf); } @Override public OutputFile newMetadataFile(String filename) { - return HadoopOutputFile.fromPath(metadataPath(filename), conf); + return HadoopOutputFile.fromFsPath(metadataFs, metadataPath(filename), conf); } @Override public void deleteFile(String path) { Path toDelete = new Path(path); - FileSystem fs = Util.getFS(toDelete, conf); try { - fs.delete(toDelete, false /* not recursive */ ); + metadataFs.delete(toDelete, false /* not recursive */ ); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to delete file: %s", path); } @@ -178,9 +177,7 @@ private Path versionHintFile() { private void writeVersionHint(int version) { Path versionHintFile = versionHintFile(); - FileSystem fs = Util.getFS(versionHintFile, conf); - - try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */ )) { + try (FSDataOutputStream out = metadataFs.create(versionHintFile, true /* overwrite */ )) { out.write(String.valueOf(version).getBytes("UTF-8")); } catch (IOException e) {