Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -135,13 +136,14 @@ public List<InputSplit> getSplits(JobContext context) {
InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
Table serializableTable = SerializableTable.copyOf(table);
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick, I moved it outside the loop.

tasksIterable.forEach(task -> {
if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE ||
model == InputFormatConfig.InMemoryDataModel.PIG)) {
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(conf, task, table.io(), table.encryption()));
splits.add(new IcebergSplit(serializableTable, conf, task));
});
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
Expand Down Expand Up @@ -204,8 +206,9 @@ public void initialize(InputSplit split, TaskAttemptContext newContext) {
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.io = ((IcebergSplit) split).io();
this.encryptionManager = ((IcebergSplit) split).encryptionManager();
Table table = ((IcebergSplit) split).table();
this.io = table.io();
this.encryptionManager = table.encryption();
this.tasks = task.files().iterator();
this.tableSchema = InputFormatConfig.tableSchema(conf);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
Expand Down
43 changes: 14 additions & 29 deletions mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.util.SerializationUtil;

Expand All @@ -38,9 +37,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred

public static final String[] ANYWHERE = new String[]{"*"};

private Table table;
private CombinedScanTask task;
private FileIO io;
private EncryptionManager encryptionManager;

private transient String[] locations;
private transient Configuration conf;
Expand All @@ -49,11 +47,10 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
public IcebergSplit() {
}

IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
IcebergSplit(Table table, Configuration conf, CombinedScanTask task) {
this.table = table;
this.task = task;
this.conf = conf;
this.io = io;
this.encryptionManager = encryptionManager;
}

public CombinedScanTask task() {
Expand Down Expand Up @@ -86,39 +83,27 @@ public String[] getLocations() {

@Override
public void write(DataOutput out) throws IOException {
byte[] tableData = SerializationUtil.serializeToBytes(table);
out.writeInt(tableData.length);
out.write(tableData);

byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);

byte[] ioData = SerializationUtil.serializeToBytes(io);
out.writeInt(ioData.length);
out.write(ioData);

byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager);
out.writeInt(encryptionManagerData.length);
out.write(encryptionManagerData);
}

@Override
public void readFields(DataInput in) throws IOException {
byte[] tableData = new byte[in.readInt()];
in.readFully(tableData);
this.table = SerializationUtil.deserializeFromBytes(tableData);

byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);

byte[] ioData = new byte[in.readInt()];
in.readFully(ioData);
this.io = SerializationUtil.deserializeFromBytes(ioData);

byte[] encryptionManagerData = new byte[in.readInt()];
in.readFully(encryptionManagerData);
this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData);
}

public FileIO io() {
return io;
}

public EncryptionManager encryptionManager() {
return encryptionManager;
public Table table() {
return table;
}
}