Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/com/netflix/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.netflix.iceberg;

import com.netflix.iceberg.io.FileIO;
import java.util.Map;

/**
Expand Down Expand Up @@ -171,4 +172,10 @@ default AppendFiles newFastAppend() {
* @return a new {@link Transaction}
*/
Transaction newTransaction();

/**
* @return a {@link FileIO} to read and write table data and metadata files
*/
FileIO io();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should TableOperations#io continue to exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because TableOperations is still how implementations are passed.


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package com.netflix.iceberg;
package com.netflix.iceberg.io;

import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Objects;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopFileIO;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Tasks;
import org.apache.hadoop.conf.Configuration;
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.netflix.iceberg;

import com.netflix.iceberg.io.FileIO;
import java.util.Map;

/**
Expand Down Expand Up @@ -135,6 +136,11 @@ public Transaction newTransaction() {
return BaseTransaction.newTransaction(ops);
}

@Override
public FileIO io() {
return operations().io();
}

@Override
public String toString() {
return name;
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.util.Tasks;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -365,5 +366,10 @@ public Rollback rollback() {
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
}

@Override
public FileIO io() {
return transactionOps.io();
}
}
}
5 changes: 2 additions & 3 deletions core/src/main/java/com/netflix/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package com.netflix.iceberg;

import com.netflix.iceberg.io.FileIO;
import java.util.UUID;

import com.netflix.iceberg.io.OutputFile;

/**
* SPI interface to abstract table metadata access and updates.
*/
Expand Down Expand Up @@ -57,7 +56,7 @@ public interface TableOperations {
void commit(TableMetadata base, TableMetadata metadata);

/**
* @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
* @return a {@link FileIO} to read and write table data and metadata files
*/
FileIO io();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.netflix.iceberg.hadoop;

import com.netflix.iceberg.FileIO;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
Expand All @@ -9,8 +9,6 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class HadoopFileIO implements FileIO {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package com.netflix.iceberg.hadoop;

import com.netflix.iceberg.FileIO;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.TableMetadata;
import com.netflix.iceberg.TableMetadataParser;
import com.netflix.iceberg.TableOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Maps;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileIO;
import java.util.Map;
import org.junit.rules.TemporaryFolder;

Expand Down
1 change: 1 addition & 0 deletions core/src/test/java/com/netflix/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.iceberg.exceptions.AlreadyExistsException;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public String shortName() {
public DataSourceReader createReader(DataSourceOptions options) {
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);

return new Reader(table, conf);
return new Reader(table);
}

@Override
Expand Down Expand Up @@ -92,7 +91,7 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct
.toUpperCase(Locale.ENGLISH));
}

return Optional.of(new Writer(table, conf, format));
return Optional.of(new Writer(table, format));
}

protected Table findTable(DataSourceOptions options, Configuration conf) {
Expand Down
41 changes: 19 additions & 22 deletions spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.netflix.iceberg.CombinedScanTask;
import com.netflix.iceberg.DataFile;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.FileScanTask;
import com.netflix.iceberg.PartitionField;
import com.netflix.iceberg.PartitionSpec;
Expand All @@ -34,7 +35,6 @@
import com.netflix.iceberg.common.DynMethods;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.hadoop.HadoopInputFile;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.parquet.Parquet;
Expand All @@ -44,7 +44,6 @@
import com.netflix.iceberg.spark.data.SparkParquetReaders;
import com.netflix.iceberg.types.TypeUtil;
import com.netflix.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
Expand All @@ -67,7 +66,6 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.SerializableConfiguration;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
Expand All @@ -89,7 +87,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private static final Filter[] NO_FILTERS = new Filter[0];

private final Table table;
private final SerializableConfiguration conf;
private final FileIO fileIo;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
Expand All @@ -99,10 +97,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private StructType type = null; // cached because Spark accesses it multiple times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks

Reader(Table table, Configuration conf) {
Reader(Table table) {
this.table = table;
this.conf = new SerializableConfiguration(conf);
this.schema = table.schema();
this.fileIo = table.io();
}

private Schema lazySchema() {
Expand Down Expand Up @@ -135,7 +133,7 @@ public List<InputPartition<InternalRow>> planInputPartitions() {

List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo));
}

return readTasks;
Expand Down Expand Up @@ -228,22 +226,22 @@ private static class ReadTask implements InputPartition<InternalRow>, Serializab
private final CombinedScanTask task;
private final String tableSchemaString;
private final String expectedSchemaString;
private final SerializableConfiguration conf;
private final FileIO fileIo;

private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;

private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
SerializableConfiguration conf) {
private ReadTask(
CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
this.conf = conf;
this.fileIo = fileIo;
}

@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo);
}

private Schema lazyTableSchema() {
Expand All @@ -270,18 +268,18 @@ private static class TaskDataReader implements InputPartitionReader<InternalRow>
private final Iterator<FileScanTask> tasks;
private final Schema tableSchema;
private final Schema expectedSchema;
private final Configuration conf;
private final FileIO fileIo;

private Iterator<InternalRow> currentIterator = null;
private Closeable currentCloseable = null;
private InternalRow current = null;

public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) {
this.fileIo = fileIo;
this.tasks = task.files().iterator();
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
this.conf = conf;
// open last because the schemas and conf must be set
// open last because the schemas and fileIo must be set
this.currentIterator = open(tasks.next());
}

Expand Down Expand Up @@ -346,17 +344,17 @@ private Iterator<InternalRow> open(FileScanTask task) {

// create joined rows and project from the joined schema to the final schema
iterSchema = TypeUtil.join(readSchema, partitionSchema);
iter = transform(open(task, readSchema, conf), joined::withLeft);
iter = transform(open(task, readSchema), joined::withLeft);

} else if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
iter = open(task, requiredSchema, conf);
iter = open(task, requiredSchema);

} else {
// return the base iterator
iterSchema = finalSchema;
iter = open(task, finalSchema, conf);
iter = open(task, finalSchema);
}

// TODO: remove the projection by reporting the iterator's schema back to Spark
Expand Down Expand Up @@ -386,9 +384,8 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema
asScalaBufferConverter(attrs).asScala().toSeq());
}

private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
Configuration conf) {
InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
InputFile location = fileIo.newInputFile(task.file().path().toString());
CloseableIterable<InternalRow> iter;
switch (task.file().format()) {
case PARQUET:
Expand Down
Loading