diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java index 3718f8f544b1..3bbd3b014ba7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java @@ -43,33 +43,41 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mr.SerializationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class IcebergInputFormat implements InputFormat { +public class IcebergInputFormat implements InputFormat { private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class); + static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers"; + private Table table; @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - //TODO: Change this to use whichever Catalog the table was made with i.e. HiveCatalog instead etc. - HadoopTables tables = new HadoopTables(job); - String tableDir = job.get("location"); + public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException { + table = findTable(conf); + CloseableIterable taskIterable = table.newScan().planTasks(); + List tasks = (List) StreamSupport + .stream(taskIterable.spliterator(), false) + .collect(Collectors.toList()); + return createSplits(tasks); + } + private Table findTable(JobConf conf) throws IOException { + HadoopTables tables = new HadoopTables(conf); + String tableDir = conf.get("location"); + if (tableDir == null) { + throw new IllegalArgumentException("Table 'location' not set in JobConf"); + } URI location = null; try { location = new URI(tableDir); } catch (URISyntaxException e) { - throw new IOException("Unable to create URI for table location: '" + tableDir + "'"); + throw new IOException("Unable to create URI for table location: '" + tableDir + "'", e); } table = tables.load(location.getPath()); - - CloseableIterable taskIterable = table.newScan().planTasks(); - List tasks = (List) StreamSupport - .stream(taskIterable.spliterator(), false) - .collect(Collectors.toList()); - return createSplits(tasks); + return table; } private InputSplit[] createSplits(List tasks) { @@ -86,17 +94,19 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo } public class IcebergRecordReader implements RecordReader { - private JobConf context; + private JobConf conf; private IcebergSplit split; private Iterator tasks; private CloseableIterable reader; private Iterator recordIterator; private Record currentRecord; + private boolean reuseContainers; public IcebergRecordReader(InputSplit split, JobConf conf) throws IOException { this.split = (IcebergSplit) split; - this.context = conf; + this.conf = conf; + this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false); initialise(); } @@ -108,12 +118,10 @@ private void initialise() { private void nextTask() { FileScanTask currentTask = tasks.next(); DataFile file = currentTask.file(); - InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context); + InputFile inputFile = HadoopInputFile.fromLocation(file.path(), conf); Schema tableSchema = table.schema(); - boolean reuseContainers = true; // FIXME: read from config - IcebergReaderFactory readerFactory = new IcebergReaderFactory(); - reader = readerFactory.createReader(file, currentTask, inputFile, tableSchema, reuseContainers); + reader = IcebergReaderFactory.createReader(file, currentTask, inputFile, tableSchema, reuseContainers); recordIterator = reader.iterator(); } @@ -126,6 +134,11 @@ public boolean next(Void key, IcebergWritable value) { } if (tasks.hasNext()) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Error closing reader", e); + } nextTask(); currentRecord = recordIterator.next(); value.setRecord(currentRecord); @@ -154,7 +167,7 @@ public long getPos() throws IOException { @Override public void close() throws IOException { - + reader.close(); } @Override @@ -165,6 +178,8 @@ public float getProgress() throws IOException { private static class IcebergSplit implements InputSplit { + private static final String[] ANYWHERE = new String[]{"*"}; + private CombinedScanTask task; IcebergSplit(CombinedScanTask task) { @@ -173,22 +188,26 @@ private static class IcebergSplit implements InputSplit { @Override public long getLength() throws IOException { - return 0; + return task.files().stream().mapToLong(FileScanTask::length).sum(); } @Override public String[] getLocations() throws IOException { - return new String[0]; + return ANYWHERE; } @Override public void write(DataOutput out) throws IOException { - + byte[] data = SerializationUtil.serializeToBytes(this.task); + out.writeInt(data.length); + out.write(data); } @Override public void readFields(DataInput in) throws IOException { - + byte[] data = new byte[in.readInt()]; + in.readFully(data); + this.task = SerializationUtil.deserializeFromBytes(data); } public CombinedScanTask getTask() { diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergReaderFactory.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergReaderFactory.java index 5fbfd7f62c0c..37bb40f54bba 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergReaderFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -33,10 +34,10 @@ class IcebergReaderFactory { - IcebergReaderFactory() { + private IcebergReaderFactory() { } - public CloseableIterable createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, + public static CloseableIterable createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, Schema tableSchema, boolean reuseContainers) { switch (file.format()) { case AVRO: @@ -52,9 +53,9 @@ public CloseableIterable createReader(DataFile file, FileScanTask curren } } - // FIXME: use generic reader function - private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) { - Avro.ReadBuilder builder = Avro.read(file) + private static CloseableIterable buildAvroReader(FileScanTask task, InputFile inputFile, Schema schema, + boolean reuseContainers) { + Avro.ReadBuilder builder = Avro.read(inputFile) .createReaderFunc(DataReader::create) .project(schema) .split(task.start(), task.length()); @@ -66,21 +67,20 @@ private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Sch return builder.build(); } - // FIXME: use generic reader function - private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) { - ORC.ReadBuilder builder = ORC.read(file) -// .createReaderFunc() // FIXME: implement + private static CloseableIterable buildOrcReader(FileScanTask task, InputFile inputFile, Schema schema, + boolean reuseContainers) { + ORC.ReadBuilder builder = ORC.read(inputFile) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) .project(schema) .split(task.start(), task.length()); return builder.build(); } - // FIXME: use generic reader function - private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema, + private static CloseableIterable buildParquetReader(FileScanTask task, InputFile inputFile, Schema schema, boolean reuseContainers) { - Parquet.ReadBuilder builder = Parquet.read(file) - .createReaderFunc(messageType -> GenericParquetReaders.buildReader(schema, messageType)) + Parquet.ReadBuilder builder = Parquet.read(inputFile) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) .project(schema) .split(task.start(), task.length()); diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergInputFormat.java index 1e228fc3300c..cf1fb878a838 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergInputFormat.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergInputFormat.java @@ -50,10 +50,9 @@ public class TestIcebergInputFormat { private File tableLocation; private Table table; + private IcebergInputFormat format = new IcebergInputFormat(); + private JobConf conf = new JobConf(); - //TODO flesh out with more tests of the IF itself - //TODO: do we still need the table data etc. if we're not testing from Hive? - @Before public void before() throws IOException { tableLocation = java.nio.file.Files.createTempDirectory("temp").toFile(); @@ -75,30 +74,32 @@ public void before() throws IOException { @Test public void testGetSplits() throws IOException { - IcebergInputFormat format = new IcebergInputFormat(); - JobConf conf = new JobConf(); conf.set("location", "file:" + tableLocation); InputSplit[] splits = format.getSplits(conf, 1); assertEquals(splits.length, 1); } + @Test(expected = IllegalArgumentException.class) + public void testGetSplitsNoLocation() throws IOException { + format.getSplits(conf, 1); + } + + @Test(expected = IOException.class) + public void testGetSplitsInvalidLocationUri() throws IOException { + conf.set("location", "http:"); + format.getSplits(conf, 1); + } + @Test public void testGetRecordReader() throws IOException { - IcebergInputFormat format = new IcebergInputFormat(); - JobConf conf = new JobConf(); conf.set("location", "file:" + tableLocation); InputSplit[] splits = format.getSplits(conf, 1); RecordReader reader = format.getRecordReader(splits[0], conf, null); IcebergWritable value = (IcebergWritable) reader.createValue(); List records = Lists.newArrayList(); - boolean unfinished = true; - while (unfinished) { - if (reader.next(null, value)) { - records.add(value.getRecord().copy()); - } else { - unfinished = false; - } + while (reader.next(null, value)) { + records.add(value.getRecord().copy()); } assertEquals(3, records.size()); } diff --git a/mr/src/test/resources/test-table/metadata/v1.metadata.json b/mr/src/test/resources/test-table/metadata/v1.metadata.json index d14ac4529e3f..0c08d1732619 100644 --- a/mr/src/test/resources/test-table/metadata/v1.metadata.json +++ b/mr/src/test/resources/test-table/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 1, "table-uuid" : "5ec03633-03bc-4c4b-8ef9-f799c143e3e7", - "location" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table", + "location" : "mr/src/test/resources/test-table", "last-updated-ms" : 1582645440292, "last-column-id" : 2, "schema" : { diff --git a/mr/src/test/resources/test-table/metadata/v2.metadata.json b/mr/src/test/resources/test-table/metadata/v2.metadata.json index ea938f9a0dbe..80a84f69913f 100644 --- a/mr/src/test/resources/test-table/metadata/v2.metadata.json +++ b/mr/src/test/resources/test-table/metadata/v2.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 1, "table-uuid" : "5ec03633-03bc-4c4b-8ef9-f799c143e3e7", - "location" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table", + "location" : "mr/src/test/resources/test-table", "last-updated-ms" : 1582645443979, "last-column-id" : 2, "schema" : { @@ -38,7 +38,7 @@ "total-records" : "3", "total-data-files" : "1" }, - "manifest-list" : "/Users/cmathiesen/projects/opensource/forks/eg-iceberg-fork/incubator-iceberg/mr/src/test/resources/test-table/metadata/snap-7829799286772121706-1-1a3ffe32-d8da-47cf-9a8c-0e4c889a3a4c.avro" + "manifest-list" : "mr/src/test/resources/test-table/metadata/snap-7829799286772121706-1-1a3ffe32-d8da-47cf-9a8c-0e4c889a3a4c.avro" } ], "snapshot-log" : [ { "timestamp-ms" : 1582645443979,