Skip to content
This repository was archived by the owner on Jun 15, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

50 changes: 7 additions & 43 deletions mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,36 +29,33 @@
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.mr.mapred.serde.objectinspector.IcebergObjectInspector;

public class IcebergSerDe extends AbstractSerDe {

private Schema schema;
private ObjectInspector inspector;
private List<Object> row;

@Override
public void initialize(@Nullable Configuration configuration, Properties serDeProperties) throws SerDeException {
Table table = null;
final Table table;

try {
table = TableResolver.resolveTableFromConfiguration(configuration, serDeProperties);
} catch (IOException e) {
throw new UncheckedIOException("Unable to resolve table from configuration: ", e);
}
this.schema = table.schema();

try {
this.inspector = new IcebergObjectInspectorGenerator().createObjectInspector(schema);
this.inspector = IcebergObjectInspector.create(table.schema());
} catch (Exception e) {
throw new SerDeException(e);
}
}

@Override
public Class<? extends Writable> getSerializedClass() {
return null;
return IcebergWritable.class;
}

@Override
Expand All @@ -82,31 +70,7 @@ public SerDeStats getSerDeStats() {

@Override
public Object deserialize(Writable writable) {
IcebergWritable icebergWritable = (IcebergWritable) writable;
List<Types.NestedField> fields = icebergWritable.schema().columns();

if (row == null || row.size() != fields.size()) {
row = new ArrayList<Object>(fields.size());
} else {
row.clear();
}
for (int i = 0; i < fields.size(); i++) {
Object obj = ((IcebergWritable) writable).record().get(i);
Type fieldType = fields.get(i).type();
if (fieldType.equals(Types.DateType.get())) {
row.add(Date.valueOf((LocalDate) obj));
} else if (fieldType.equals(Types.TimestampType.withoutZone())) {
row.add(Timestamp.valueOf((LocalDateTime) obj));
} else if (fieldType.equals(Types.TimestampType.withZone())) {
LocalDateTime timestamp = ((OffsetDateTime) obj).toLocalDateTime();
row.add(Timestamp.valueOf(timestamp));
} else if (fieldType.equals(Types.TimeType.get())) {
row.add(((LocalTime) obj).toString());
} else {
row.add(obj);
}
}
return Collections.unmodifiableList(row);
return ((IcebergWritable) writable).record();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class IcebergWritable implements Writable {
private Record record;
private Schema schema;

public IcebergWritable(Record record, Schema schema) {
this.record = record;
this.schema = schema;
}

@SuppressWarnings("checkstyle:HiddenField")
public void wrapRecord(Record record) {
this.record = record;
Expand Down
21 changes: 10 additions & 11 deletions mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.iceberg.mr.mapred;

import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -45,27 +45,26 @@ static Table resolveTableFromJob(JobConf conf) throws IOException {

static Table resolveTableFromConfiguration(Configuration conf, Properties properties) throws IOException {
String catalogName = properties.getProperty(InputFormatConfig.CATALOG_NAME, InputFormatConfig.HADOOP_TABLES);
String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION);
String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME);
Preconditions.checkNotNull(tableLocation, "Table location is not set.");
Preconditions.checkNotNull(tableName, "Table name is not set.");

switch (catalogName) {
case InputFormatConfig.HADOOP_TABLES:
String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION);
Preconditions.checkNotNull(tableLocation, "Table location is not set.");
HadoopTables tables = new HadoopTables(conf);
return tables.load(tableLocation);

case InputFormatConfig.HIVE_CATALOG:
String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME);
Preconditions.checkNotNull(tableName, "Table name is not set.");
//TODO Implement HiveCatalog
return null;
default:
throw new NoSuchTableException("Table does not exist at location: " + tableLocation);
throw new RuntimeException("Catalog " + catalogName + " not supported.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: throw NoSuchNamespacException

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can address that in https://github.com/apache/iceberg/pull/1103/files#diff-64ac74e3513bf6536c0a39f377c48ce5R62 (I'll make the change now, also spotted that the tests for this class could be fleshed out and tidied up a bit)

}
}

protected static String extractProperty(JobConf conf, String key) {
String value = conf.get(key);
if (value == null) {
throw new IllegalArgumentException("Property not set in JobConf: " + key);
}
return value;
return Optional.ofNullable(conf.get(key))
.orElseThrow(() -> new IllegalArgumentException("Property not set in JobConf: " + key));
}
}
Loading