Skip to content
This repository was archived by the owner on Jun 15, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,41 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergInputFormat implements InputFormat {
public class IcebergInputFormat<T> implements InputFormat<Void, T> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class);

static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers";

private Table table;

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
//TODO: Change this to use whichever Catalog the table was made with i.e. HiveCatalog instead etc.
HadoopTables tables = new HadoopTables(job);
String tableDir = job.get("location");
public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
table = findTable(conf);
CloseableIterable taskIterable = table.newScan().planTasks();
List<CombinedScanTask> tasks = (List<CombinedScanTask>) StreamSupport
.stream(taskIterable.spliterator(), false)
.collect(Collectors.toList());
return createSplits(tasks);
}

private Table findTable(JobConf conf) throws IOException {
HadoopTables tables = new HadoopTables(conf);
String tableDir = conf.get("location");
if (tableDir == null) {
throw new IllegalArgumentException("Table 'location' not set in JobConf");
}
URI location = null;
try {
location = new URI(tableDir);
} catch (URISyntaxException e) {
throw new IOException("Unable to create URI for table location: '" + tableDir + "'");
throw new IOException("Unable to create URI for table location: '" + tableDir + "'", e);
}
table = tables.load(location.getPath());

CloseableIterable taskIterable = table.newScan().planTasks();
List<CombinedScanTask> tasks = (List<CombinedScanTask>) StreamSupport
.stream(taskIterable.spliterator(), false)
.collect(Collectors.toList());
return createSplits(tasks);
return table;
}

private InputSplit[] createSplits(List<CombinedScanTask> tasks) {
Expand All @@ -86,17 +94,19 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo
}

public class IcebergRecordReader implements RecordReader<Void, IcebergWritable> {
private JobConf context;
private JobConf conf;
private IcebergSplit split;

private Iterator<FileScanTask> tasks;
private CloseableIterable<Record> reader;
private Iterator<Record> recordIterator;
private Record currentRecord;
private boolean reuseContainers;

public IcebergRecordReader(InputSplit split, JobConf conf) throws IOException {
this.split = (IcebergSplit) split;
this.context = conf;
this.conf = conf;
this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false);
initialise();
}

Expand All @@ -108,12 +118,10 @@ private void initialise() {
private void nextTask() {
FileScanTask currentTask = tasks.next();
DataFile file = currentTask.file();
InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context);
InputFile inputFile = HadoopInputFile.fromLocation(file.path(), conf);
Schema tableSchema = table.schema();
boolean reuseContainers = true; // FIXME: read from config

IcebergReaderFactory readerFactory = new IcebergReaderFactory();
reader = readerFactory.createReader(file, currentTask, inputFile, tableSchema, reuseContainers);
reader = IcebergReaderFactory.createReader(file, currentTask, inputFile, tableSchema, reuseContainers);
recordIterator = reader.iterator();
}

Expand All @@ -126,6 +134,11 @@ public boolean next(Void key, IcebergWritable value) {
}

if (tasks.hasNext()) {
try {
reader.close();
} catch (IOException e) {
LOG.error("Error closing reader", e);
}
nextTask();
currentRecord = recordIterator.next();
value.setRecord(currentRecord);
Expand Down Expand Up @@ -154,7 +167,7 @@ public long getPos() throws IOException {

@Override
public void close() throws IOException {

reader.close();
}

@Override
Expand All @@ -165,6 +178,8 @@ public float getProgress() throws IOException {

private static class IcebergSplit implements InputSplit {

private static final String[] ANYWHERE = new String[]{"*"};

private CombinedScanTask task;

IcebergSplit(CombinedScanTask task) {
Expand All @@ -173,22 +188,26 @@ private static class IcebergSplit implements InputSplit {

@Override
public long getLength() throws IOException {
return 0;
return task.files().stream().mapToLong(FileScanTask::length).sum();
}

@Override
public String[] getLocations() throws IOException {
return new String[0];
return ANYWHERE;
}

@Override
public void write(DataOutput out) throws IOException {

byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);
}

@Override
public void readFields(DataInput in) throws IOException {

byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);
}

public CombinedScanTask getTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand All @@ -33,10 +34,10 @@

class IcebergReaderFactory {

IcebergReaderFactory() {
private IcebergReaderFactory() {
}

public CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile,
public static CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile,
Schema tableSchema, boolean reuseContainers) {
switch (file.format()) {
case AVRO:
Expand All @@ -52,9 +53,9 @@ public CloseableIterable<Record> createReader(DataFile file, FileScanTask curren
}
}

// FIXME: use generic reader function
private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
Avro.ReadBuilder builder = Avro.read(file)
private static CloseableIterable buildAvroReader(FileScanTask task, InputFile inputFile, Schema schema,
boolean reuseContainers) {
Avro.ReadBuilder builder = Avro.read(inputFile)
.createReaderFunc(DataReader::create)
.project(schema)
.split(task.start(), task.length());
Expand All @@ -66,21 +67,20 @@ private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Sch
return builder.build();
}

// FIXME: use generic reader function
private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
ORC.ReadBuilder builder = ORC.read(file)
// .createReaderFunc() // FIXME: implement
private static CloseableIterable buildOrcReader(FileScanTask task, InputFile inputFile, Schema schema,
boolean reuseContainers) {
ORC.ReadBuilder builder = ORC.read(inputFile)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
.project(schema)
.split(task.start(), task.length());

return builder.build();
}

// FIXME: use generic reader function
private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema,
private static CloseableIterable buildParquetReader(FileScanTask task, InputFile inputFile, Schema schema,
boolean reuseContainers) {
Parquet.ReadBuilder builder = Parquet.read(file)
.createReaderFunc(messageType -> GenericParquetReaders.buildReader(schema, messageType))
Parquet.ReadBuilder builder = Parquet.read(inputFile)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.project(schema)
.split(task.start(), task.length());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ public class TestIcebergInputFormat {

private File tableLocation;
private Table table;
private IcebergInputFormat format = new IcebergInputFormat();
private JobConf conf = new JobConf();

//TODO flesh out with more tests of the IF itself
//TODO: do we still need the table data etc. if we're not testing from Hive?

@Before
public void before() throws IOException {
tableLocation = java.nio.file.Files.createTempDirectory("temp").toFile();
Expand All @@ -75,30 +74,32 @@ public void before() throws IOException {

@Test
public void testGetSplits() throws IOException {
IcebergInputFormat format = new IcebergInputFormat();
JobConf conf = new JobConf();
conf.set("location", "file:" + tableLocation);
InputSplit[] splits = format.getSplits(conf, 1);
assertEquals(splits.length, 1);
}

@Test(expected = IllegalArgumentException.class)
public void testGetSplitsNoLocation() throws IOException {
format.getSplits(conf, 1);
}

@Test(expected = IOException.class)
public void testGetSplitsInvalidLocationUri() throws IOException {
conf.set("location", "http:");
format.getSplits(conf, 1);
}

@Test
public void testGetRecordReader() throws IOException {
IcebergInputFormat format = new IcebergInputFormat();
JobConf conf = new JobConf();
conf.set("location", "file:" + tableLocation);
InputSplit[] splits = format.getSplits(conf, 1);
RecordReader reader = format.getRecordReader(splits[0], conf, null);
IcebergWritable value = (IcebergWritable) reader.createValue();

List<Record> records = Lists.newArrayList();
boolean unfinished = true;
while (unfinished) {
if (reader.next(null, value)) {
records.add(value.getRecord().copy());
} else {
unfinished = false;
}
while (reader.next(null, value)) {
records.add(value.getRecord().copy());
}
assertEquals(3, records.size());
}
Expand Down
2 changes: 1 addition & 1 deletion mr/src/test/resources/test-table/metadata/v1.metadata.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"format-version" : 1,
"table-uuid" : "5ec03633-03bc-4c4b-8ef9-f799c143e3e7",
"location" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table",
"location" : "mr/src/test/resources/test-table",
"last-updated-ms" : 1582645440292,
"last-column-id" : 2,
"schema" : {
Expand Down
4 changes: 2 additions & 2 deletions mr/src/test/resources/test-table/metadata/v2.metadata.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"format-version" : 1,
"table-uuid" : "5ec03633-03bc-4c4b-8ef9-f799c143e3e7",
"location" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table",
"location" : "mr/src/test/resources/test-table",
"last-updated-ms" : 1582645443979,
"last-column-id" : 2,
"schema" : {
Expand Down Expand Up @@ -38,7 +38,7 @@
"total-records" : "3",
"total-data-files" : "1"
},
"manifest-list" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table/metadata/snap-7829799286772121706-1-1a3ffe32-d8da-47cf-9a8c-0e4c889a3a4c.avro"
"manifest-list" : "mr/src/test/resources/test-table/metadata/snap-7829799286772121706-1-1a3ffe32-d8da-47cf-9a8c-0e4c889a3a4c.avro"
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1582645443979,
Expand Down