diff --git a/api/src/main/java/com/netflix/iceberg/Table.java b/api/src/main/java/com/netflix/iceberg/Table.java index fe19fa203abe..9acb50d45307 100644 --- a/api/src/main/java/com/netflix/iceberg/Table.java +++ b/api/src/main/java/com/netflix/iceberg/Table.java @@ -19,6 +19,7 @@ package com.netflix.iceberg; +import com.netflix.iceberg.io.FileIO; import java.util.Map; /** @@ -171,4 +172,10 @@ default AppendFiles newFastAppend() { * @return a new {@link Transaction} */ Transaction newTransaction(); + + /** + * @return a {@link FileIO} to read and write table data and metadata files + */ + FileIO io(); + } diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/api/src/main/java/com/netflix/iceberg/io/FileIO.java similarity index 98% rename from core/src/main/java/com/netflix/iceberg/FileIO.java rename to api/src/main/java/com/netflix/iceberg/io/FileIO.java index fdba7afba4d8..ed859b90fdd9 100644 --- a/core/src/main/java/com/netflix/iceberg/FileIO.java +++ b/api/src/main/java/com/netflix/iceberg/io/FileIO.java @@ -17,7 +17,7 @@ * under the License. */ -package com.netflix.iceberg; +package com.netflix.iceberg.io; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; diff --git a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java index b107d0b98120..3e9b420df485 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java @@ -22,6 +22,7 @@ import com.google.common.base.Objects; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.hadoop.HadoopFileIO; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.io.OutputFile; import com.netflix.iceberg.util.Tasks; import org.apache.hadoop.conf.Configuration; diff --git a/core/src/main/java/com/netflix/iceberg/BaseTable.java b/core/src/main/java/com/netflix/iceberg/BaseTable.java index da11b55434f7..7d48ef2bc1c2 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTable.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTable.java @@ -19,6 +19,7 @@ package com.netflix.iceberg; +import com.netflix.iceberg.io.FileIO; import java.util.Map; /** @@ -135,6 +136,11 @@ public Transaction newTransaction() { return BaseTransaction.newTransaction(ops); } + @Override + public FileIO io() { + return operations().io(); + } + @Override public String toString() { return name; diff --git a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java index 1a56b7e16ee7..b7c3a32d49f3 100644 --- a/core/src/main/java/com/netflix/iceberg/BaseTransaction.java +++ b/core/src/main/java/com/netflix/iceberg/BaseTransaction.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.netflix.iceberg.exceptions.CommitFailedException; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.util.Tasks; import java.util.List; import java.util.Map; @@ -365,5 +366,10 @@ public Rollback rollback() { public Transaction newTransaction() { throw new UnsupportedOperationException("Cannot create a transaction within a transaction"); } + + @Override + public FileIO io() { + return transactionOps.io(); + } } } diff --git a/core/src/main/java/com/netflix/iceberg/TableOperations.java b/core/src/main/java/com/netflix/iceberg/TableOperations.java index 19fc3864dfbc..974d5e235ed1 100644 --- a/core/src/main/java/com/netflix/iceberg/TableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/TableOperations.java @@ -19,10 +19,9 @@ package com.netflix.iceberg; +import com.netflix.iceberg.io.FileIO; import java.util.UUID; -import com.netflix.iceberg.io.OutputFile; - /** * SPI interface to abstract table metadata access and updates. */ @@ -57,7 +56,7 @@ public interface TableOperations { void commit(TableMetadata base, TableMetadata metadata); /** - * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files + * @return a {@link FileIO} to read and write table data and metadata files */ FileIO io(); diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java index 586942cc24ce..7e1d004813c8 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java @@ -1,6 +1,6 @@ package com.netflix.iceberg.hadoop; -import com.netflix.iceberg.FileIO; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; @@ -9,8 +9,6 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; public class HadoopFileIO implements FileIO { diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 4aa19f4cbcea..1a3b0fd49bd6 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -19,7 +19,7 @@ package com.netflix.iceberg.hadoop; -import com.netflix.iceberg.FileIO; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.TableMetadata; import com.netflix.iceberg.TableMetadataParser; import com.netflix.iceberg.TableOperations; diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java index 1508ee804847..baa286ffb00b 100644 --- a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java +++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.netflix.iceberg.exceptions.RuntimeIOException; +import com.netflix.iceberg.io.FileIO; import java.util.Map; import org.junit.rules.TemporaryFolder; diff --git a/core/src/test/java/com/netflix/iceberg/TestTables.java b/core/src/test/java/com/netflix/iceberg/TestTables.java index f1dbe4aae3ac..fbb58d40c370 100644 --- a/core/src/test/java/com/netflix/iceberg/TestTables.java +++ b/core/src/test/java/com/netflix/iceberg/TestTables.java @@ -24,6 +24,7 @@ import com.netflix.iceberg.exceptions.AlreadyExistsException; import com.netflix.iceberg.exceptions.CommitFailedException; import com.netflix.iceberg.exceptions.RuntimeIOException; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; import java.io.File; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index cd1a0afe3622..1991d29cbbe5 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -59,8 +59,7 @@ public String shortName() { public DataSourceReader createReader(DataSourceOptions options) { Configuration conf = new Configuration(lazyBaseConf()); Table table = getTableAndResolveHadoopConfiguration(options, conf); - - return new Reader(table, conf); + return new Reader(table); } @Override @@ -92,7 +91,7 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, conf, format)); + return Optional.of(new Writer(table, format)); } protected Table findTable(DataSourceOptions options, Configuration conf) { diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java index 4a008ee308d3..33b95c1730b3 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.netflix.iceberg.CombinedScanTask; import com.netflix.iceberg.DataFile; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.FileScanTask; import com.netflix.iceberg.PartitionField; import com.netflix.iceberg.PartitionSpec; @@ -34,7 +35,6 @@ import com.netflix.iceberg.common.DynMethods; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.expressions.Expression; -import com.netflix.iceberg.hadoop.HadoopInputFile; import com.netflix.iceberg.io.CloseableIterable; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.parquet.Parquet; @@ -44,7 +44,6 @@ import com.netflix.iceberg.spark.data.SparkParquetReaders; import com.netflix.iceberg.types.TypeUtil; import com.netflix.iceberg.types.Types; -import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; @@ -67,7 +66,6 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -import org.apache.spark.util.SerializableConfiguration; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -89,7 +87,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private static final Filter[] NO_FILTERS = new Filter[0]; private final Table table; - private final SerializableConfiguration conf; + private final FileIO fileIo; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; @@ -99,10 +97,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private StructType type = null; // cached because Spark accesses it multiple times private List tasks = null; // lazy cache of tasks - Reader(Table table, Configuration conf) { + Reader(Table table) { this.table = table; - this.conf = new SerializableConfiguration(conf); this.schema = table.schema(); + this.fileIo = table.io(); } private Schema lazySchema() { @@ -135,7 +133,7 @@ public List> planInputPartitions() { List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { - readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf)); + readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo)); } return readTasks; @@ -228,22 +226,22 @@ private static class ReadTask implements InputPartition, Serializab private final CombinedScanTask task; private final String tableSchemaString; private final String expectedSchemaString; - private final SerializableConfiguration conf; + private final FileIO fileIo; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; - private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString, - SerializableConfiguration conf) { + private ReadTask( + CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; - this.conf = conf; + this.fileIo = fileIo; } @Override public InputPartitionReader createPartitionReader() { - return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value()); + return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo); } private Schema lazyTableSchema() { @@ -270,18 +268,18 @@ private static class TaskDataReader implements InputPartitionReader private final Iterator tasks; private final Schema tableSchema; private final Schema expectedSchema; - private final Configuration conf; + private final FileIO fileIo; private Iterator currentIterator = null; private Closeable currentCloseable = null; private InternalRow current = null; - public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) { + public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) { + this.fileIo = fileIo; this.tasks = task.files().iterator(); this.tableSchema = tableSchema; this.expectedSchema = expectedSchema; - this.conf = conf; - // open last because the schemas and conf must be set + // open last because the schemas and fileIo must be set this.currentIterator = open(tasks.next()); } @@ -346,17 +344,17 @@ private Iterator open(FileScanTask task) { // create joined rows and project from the joined schema to the final schema iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = transform(open(task, readSchema, conf), joined::withLeft); + iter = transform(open(task, readSchema), joined::withLeft); } else if (hasExtraFilterColumns) { // add projection to the final schema iterSchema = requiredSchema; - iter = open(task, requiredSchema, conf); + iter = open(task, requiredSchema); } else { // return the base iterator iterSchema = finalSchema; - iter = open(task, finalSchema, conf); + iter = open(task, finalSchema); } // TODO: remove the projection by reporting the iterator's schema back to Spark @@ -386,9 +384,8 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema asScalaBufferConverter(attrs).asScala().toSeq()); } - private Iterator open(FileScanTask task, Schema readSchema, - Configuration conf) { - InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf); + private Iterator open(FileScanTask task, Schema readSchema) { + InputFile location = fileIo.newInputFile(task.file().path().toString()); CloseableIterable iter; switch (task.file().format()) { case PARQUET: diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index c9d3a7b31221..902ba80be880 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -28,6 +28,7 @@ import com.netflix.iceberg.DataFile; import com.netflix.iceberg.DataFiles; import com.netflix.iceberg.FileFormat; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.Metrics; import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; @@ -35,8 +36,6 @@ import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.avro.Avro; import com.netflix.iceberg.exceptions.RuntimeIOException; -import com.netflix.iceberg.hadoop.HadoopInputFile; -import com.netflix.iceberg.hadoop.HadoopOutputFile; import com.netflix.iceberg.io.FileAppender; import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; @@ -46,8 +45,6 @@ import com.netflix.iceberg.transforms.Transforms; import com.netflix.iceberg.types.Types.StringType; import com.netflix.iceberg.util.Tasks; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; @@ -55,7 +52,6 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; -import org.apache.spark.util.SerializableConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; @@ -89,18 +85,18 @@ class Writer implements DataSourceWriter { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); private final Table table; - private final Configuration conf; private final FileFormat format; + private final FileIO fileIo; - Writer(Table table, Configuration conf, FileFormat format) { + Writer(Table table, FileFormat format) { this.table = table; - this.conf = conf; this.format = format; + this.fileIo = table.io(); } @Override public DataWriterFactory createWriterFactory() { - return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf); + return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), fileIo); } @Override @@ -122,13 +118,6 @@ public void commit(WriterCommitMessage[] messages) { @Override public void abort(WriterCommitMessage[] messages) { - FileSystem fs; - try { - fs = new Path(table.location()).getFileSystem(conf); - } catch (IOException e) { - throw new RuntimeIOException(e); - } - Tasks.foreach(files(messages)) .retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) .exponentialBackoff( @@ -138,11 +127,7 @@ public void abort(WriterCommitMessage[] messages) { 2.0 /* exponential */ ) .throwFailureWhenFinished() .run(file -> { - try { - fs.delete(new Path(file.path().toString()), false /* not recursive */ ); - } catch (IOException e) { - throw new RuntimeIOException(e); - } + fileIo.deleteFile(file.path().toString()); }); } @@ -165,9 +150,10 @@ private int propertyAsInt(String property, int defaultValue) { } private String dataLocation() { - return table.properties().getOrDefault( - TableProperties.WRITE_NEW_DATA_LOCATION, - new Path(new Path(table.location()), "data").toString()); + return stripTrailingSlash( + table.properties().getOrDefault( + TableProperties.WRITE_NEW_DATA_LOCATION, + String.format("%s/data", table.location()))); } @Override @@ -202,18 +188,16 @@ private static class WriterFactory implements DataWriterFactory { private final FileFormat format; private final String dataLocation; private final Map properties; - private final SerializableConfiguration conf; private final String uuid = UUID.randomUUID().toString(); - - private transient Path dataPath = null; + private final FileIO fileIo; WriterFactory(PartitionSpec spec, FileFormat format, String dataLocation, - Map properties, Configuration conf) { + Map properties, FileIO fileIo) { this.spec = spec; this.format = format; this.dataLocation = dataLocation; this.properties = properties; - this.conf = new SerializableConfiguration(conf); + this.fileIo = fileIo; } @Override @@ -221,12 +205,10 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid)); AppenderFactory factory = new SparkAppenderFactory(); if (spec.fields().isEmpty()) { - return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory); - + return new UnpartitionedWriter(dataLocation, filename, format, factory, fileIo); } else { - Path baseDataPath = lazyDataPath(); // avoid calling this in the output path function - Function outputPathFunc = key -> - new Path(new Path(baseDataPath, key.toPath()), filename); + Function outputPathFunc = key -> + String.format("%s/%s/%s", dataLocation, key.toPath(), filename); boolean useObjectStorage = ( Boolean.parseBoolean(properties.get(OBJECT_STORE_ENABLED)) || @@ -235,43 +217,46 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo if (useObjectStorage) { // try to get db and table portions of the path for context in the object store - String context = pathContext(baseDataPath); - String objectStore = properties.get(OBJECT_STORE_PATH); + String context = pathContext(new Path(dataLocation)); + String objectStore = stripTrailingSlash(properties.get(OBJECT_STORE_PATH)); Preconditions.checkNotNull(objectStore, "Cannot use object storage, missing location: " + OBJECT_STORE_PATH); - Path objectStorePath = new Path(objectStore); outputPathFunc = key -> { - String partitionAndFilename = key.toPath() + "/" + filename; + String partitionAndFilename = String.format("%s/%s", key.toPath(), filename); int hash = HASH_FUNC.apply(partitionAndFilename); - return new Path(objectStorePath, - String.format("%08x/%s/%s", hash, context, partitionAndFilename)); + return String.format( + "%s/%08x/%s/%s/%s", + objectStore, + hash, + context, + key.toPath(), + filename); }; } - return new PartitionedWriter(spec, format, conf.value(), factory, outputPathFunc); + return new PartitionedWriter(spec, format, factory, outputPathFunc, fileIo); } } private static String pathContext(Path dataPath) { Path parent = dataPath.getParent(); + String resolvedContext; if (parent != null) { // remove the data folder if (dataPath.getName().equals("data")) { - return pathContext(parent); + resolvedContext = pathContext(parent); + } else { + resolvedContext = String.format("%s/%s", parent.getName(), dataPath.getName()); } - - return parent.getName() + "/" + dataPath.getName(); + } else { + resolvedContext = dataPath.getName(); } - return dataPath.getName(); - } - - private Path lazyDataPath() { - if (dataPath == null) { - this.dataPath = new Path(dataLocation); - } - return dataPath; + Preconditions.checkState( + !resolvedContext.endsWith("/"), + "Path context must not end with a slash."); + return resolvedContext; } private class SparkAppenderFactory implements AppenderFactory { @@ -314,16 +299,20 @@ private interface AppenderFactory { } private static class UnpartitionedWriter implements DataWriter, Closeable { - private final Path file; - private final Configuration conf; + private final FileIO fileIo; + private final String file; private FileAppender appender = null; private Metrics metrics = null; - UnpartitionedWriter(Path dataPath, String filename, FileFormat format, - Configuration conf, AppenderFactory factory) { - this.file = new Path(dataPath, filename); - this.appender = factory.newAppender(HadoopOutputFile.fromPath(file, conf), format); - this.conf = conf; + UnpartitionedWriter( + String dataPath, + String filename, + FileFormat format, + AppenderFactory factory, + FileIO fileIo) { + this.file = String.format("%s/%s", dataPath, filename); + this.fileIo = fileIo; + this.appender = factory.newAppender(fileIo.newOutputFile(file), format); } @Override @@ -338,12 +327,11 @@ public WriterCommitMessage commit() throws IOException { close(); if (metrics.recordCount() == 0L) { - FileSystem fs = file.getFileSystem(conf); - fs.delete(file, false); + fileIo.deleteFile(file); return new TaskCommit(); } - InputFile inFile = HadoopInputFile.fromPath(file, conf); + InputFile inFile = fileIo.newInputFile(file); DataFile dataFile = DataFiles.fromInputFile(inFile, null, metrics); return new TaskCommit(dataFile); @@ -354,9 +342,7 @@ public void abort() throws IOException { Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this); close(); - - FileSystem fs = file.getFileSystem(conf); - fs.delete(file, false); + fileIo.deleteFile(file); } @Override @@ -374,24 +360,27 @@ private static class PartitionedWriter implements DataWriter { private final List completedFiles = Lists.newArrayList(); private final PartitionSpec spec; private final FileFormat format; - private final Configuration conf; private final AppenderFactory factory; - private final Function outputPathFunc; + private final Function outputPathFunc; private final PartitionKey key; + private final FileIO fileIo; private PartitionKey currentKey = null; private FileAppender currentAppender = null; - private Path currentPath = null; - - PartitionedWriter(PartitionSpec spec, FileFormat format, Configuration conf, - AppenderFactory factory, - Function outputPathFunc) { + private String currentPath = null; + + PartitionedWriter( + PartitionSpec spec, + FileFormat format, + AppenderFactory factory, + Function outputPathFunc, + FileIO fileIo) { this.spec = spec; this.format = format; - this.conf = conf; this.factory = factory; this.outputPathFunc = outputPathFunc; this.key = new PartitionKey(spec); + this.fileIo = fileIo; } @Override @@ -410,7 +399,7 @@ public void write(InternalRow row) throws IOException { this.currentKey = key.copy(); this.currentPath = outputPathFunc.apply(currentKey); - OutputFile file = HadoopOutputFile.fromPath(currentPath, conf); + OutputFile file = fileIo.newOutputFile(currentPath.toString()); this.currentAppender = factory.newAppender(file, format); } @@ -425,18 +414,16 @@ public WriterCommitMessage commit() throws IOException { @Override public void abort() throws IOException { - FileSystem fs = currentPath.getFileSystem(conf); - // clean up files created by this writer Tasks.foreach(completedFiles) .throwFailureWhenFinished() .noRetry() - .run(file -> fs.delete(new Path(file.path().toString())), IOException.class); + .run(file -> fileIo.deleteFile(file.path().toString())); if (currentAppender != null) { currentAppender.close(); this.currentAppender = null; - fs.delete(currentPath); + fileIo.deleteFile(currentPath); } } @@ -447,7 +434,7 @@ private void closeCurrent() throws IOException { Metrics metrics = currentAppender.metrics(); this.currentAppender = null; - InputFile inFile = HadoopInputFile.fromPath(currentPath, conf); + InputFile inFile = fileIo.newInputFile(currentPath); DataFile dataFile = DataFiles.builder(spec) .withInputFile(inFile) .withPartition(currentKey) @@ -459,4 +446,12 @@ private void closeCurrent() throws IOException { } } } + + private static String stripTrailingSlash(String path) { + String result = path; + while (result.endsWith("/")) { + result = result.substring(0, path.length() - 1); + } + return result; + } } diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java index 90b6dc8ec6ea..c18636f60f00 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java @@ -21,7 +21,7 @@ import com.google.common.collect.Maps; import com.netflix.iceberg.BaseTable; -import com.netflix.iceberg.FileIO; +import com.netflix.iceberg.io.FileIO; import com.netflix.iceberg.Files; import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; @@ -34,7 +34,6 @@ import com.netflix.iceberg.io.InputFile; import com.netflix.iceberg.io.OutputFile; import java.io.File; -import java.io.IOException; import java.util.Map; // TODO: Use the copy of this from core. @@ -73,7 +72,8 @@ private TestTable(TestTableOperations ops, String name) { this.ops = ops; } - TestTableOperations ops() { + @Override + public TestTableOperations operations() { return ops; } }