diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index c526edef5c4f..3c64d59e5e93 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -135,13 +136,14 @@ public List getSplits(JobContext context) { InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.GENERIC); try (CloseableIterable tasksIterable = scan.planTasks()) { + Table serializableTable = SerializableTable.copyOf(table); tasksIterable.forEach(task -> { if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE || model == InputFormatConfig.InMemoryDataModel.PIG)) { // TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } - splits.add(new IcebergSplit(conf, task, table.io(), table.encryption())); + splits.add(new IcebergSplit(serializableTable, conf, task)); }); } catch (IOException e) { throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e); @@ -204,8 +206,9 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) { // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances CombinedScanTask task = ((IcebergSplit) split).task(); this.context = newContext; - this.io = ((IcebergSplit) split).io(); - this.encryptionManager = ((IcebergSplit) split).encryptionManager(); + Table table = ((IcebergSplit) split).table(); + this.io = table.io(); + this.encryptionManager = table.encryption(); this.tasks = task.files().iterator(); this.tableSchema = InputFormatConfig.tableSchema(conf); this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java index 632224eea876..8bc332eaa431 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java @@ -26,9 +26,8 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.Util; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.util.SerializationUtil; @@ -38,9 +37,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public static final String[] ANYWHERE = new String[]{"*"}; + private Table table; private CombinedScanTask task; - private FileIO io; - private EncryptionManager encryptionManager; private transient String[] locations; private transient Configuration conf; @@ -49,11 +47,10 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public IcebergSplit() { } - IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) { + IcebergSplit(Table table, Configuration conf, CombinedScanTask task) { + this.table = table; this.task = task; this.conf = conf; - this.io = io; - this.encryptionManager = encryptionManager; } public CombinedScanTask task() { @@ -86,39 +83,27 @@ public String[] getLocations() { @Override public void write(DataOutput out) throws IOException { + byte[] tableData = SerializationUtil.serializeToBytes(table); + out.writeInt(tableData.length); + out.write(tableData); + byte[] data = SerializationUtil.serializeToBytes(this.task); out.writeInt(data.length); out.write(data); - - byte[] ioData = SerializationUtil.serializeToBytes(io); - out.writeInt(ioData.length); - out.write(ioData); - - byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager); - out.writeInt(encryptionManagerData.length); - out.write(encryptionManagerData); } @Override public void readFields(DataInput in) throws IOException { + byte[] tableData = new byte[in.readInt()]; + in.readFully(tableData); + this.table = SerializationUtil.deserializeFromBytes(tableData); + byte[] data = new byte[in.readInt()]; in.readFully(data); this.task = SerializationUtil.deserializeFromBytes(data); - - byte[] ioData = new byte[in.readInt()]; - in.readFully(ioData); - this.io = SerializationUtil.deserializeFromBytes(ioData); - - byte[] encryptionManagerData = new byte[in.readInt()]; - in.readFully(encryptionManagerData); - this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData); - } - - public FileIO io() { - return io; } - public EncryptionManager encryptionManager() { - return encryptionManager; + public Table table() { + return table; } }