diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergObjectInspectorGenerator.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergObjectInspectorGenerator.java deleted file mode 100644 index f6838dbae9d2..000000000000 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergObjectInspectorGenerator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.mapred; - -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types; - -class IcebergObjectInspectorGenerator { - - protected ObjectInspector createObjectInspector(Schema schema) throws Exception { - List columnNames = setColumnNames(schema); - List columnTypes = IcebergSchemaToTypeInfo.getColumnTypes(schema); - - List columnOIs = new ArrayList<>(columnTypes.size()); - for (int i = 0; i < columnTypes.size(); i++) { - columnOIs.add(createObjectInspectorWorker(columnTypes.get(i))); - } - return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs, null); - } - - protected ObjectInspector createObjectInspectorWorker(TypeInfo typeInfo) throws Exception { - ObjectInspector.Category typeCategory = typeInfo.getCategory(); - - switch (typeCategory) { - case PRIMITIVE: - PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo; - return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(pti); - case LIST: - ListTypeInfo ati = (ListTypeInfo) typeInfo; - return ObjectInspectorFactory - .getStandardListObjectInspector(createObjectInspectorWorker(ati.getListElementTypeInfo())); - case MAP: - MapTypeInfo mti = (MapTypeInfo) typeInfo; - return ObjectInspectorFactory.getStandardMapObjectInspector( - createObjectInspectorWorker(mti.getMapKeyTypeInfo()), - createObjectInspectorWorker(mti.getMapValueTypeInfo())); - case STRUCT: - StructTypeInfo sti = (StructTypeInfo) typeInfo; - List ois = new ArrayList<>(sti.getAllStructFieldTypeInfos().size()); - for (TypeInfo structTypeInfos : sti.getAllStructFieldTypeInfos()) { - ois.add(createObjectInspectorWorker(structTypeInfos)); - } - return ObjectInspectorFactory.getStandardStructObjectInspector(sti.getAllStructFieldNames(), ois); - default: - throw new SerDeException("Couldn't create Object Inspector for category: '" + typeCategory + "'"); - } - } - - protected List setColumnNames(Schema schema) { - List fields = schema.columns(); - List fieldsList = new ArrayList<>(fields.size()); - for (Types.NestedField field : fields) { - fieldsList.add(field.name()); - } - return fieldsList; - } - -} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java deleted file mode 100644 index edc622f73a5f..000000000000 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSchemaToTypeInfo.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.mapred; - -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.iceberg.Schema; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; - -/** - * Class to convert Iceberg types to Hive TypeInfo - */ -final class IcebergSchemaToTypeInfo { - - private IcebergSchemaToTypeInfo() { - } - - private static final ImmutableMap primitiveTypeToTypeInfo = ImmutableMap.builder() - .put(Types.BooleanType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BOOLEAN_TYPE_NAME)) - .put(Types.IntegerType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.INT_TYPE_NAME)) - .put(Types.LongType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BIGINT_TYPE_NAME)) - .put(Types.FloatType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.FLOAT_TYPE_NAME)) - .put(Types.DoubleType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.DOUBLE_TYPE_NAME)) - .put(Types.BinaryType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BINARY_TYPE_NAME)) - .put(Types.StringType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME)) - .put(Types.DateType.get(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.DATE_TYPE_NAME)) - .put(Types.TimestampType.withoutZone(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.TIMESTAMP_TYPE_NAME)) - .put(Types.TimestampType.withZone(), TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.TIMESTAMP_TYPE_NAME)) - .build(); - - public static List getColumnTypes(Schema schema) throws Exception { - List fields = schema.columns(); - List types = new ArrayList<>(fields.size()); - for (Types.NestedField field : fields) { - types.add(generateTypeInfo(field.type())); - } - return types; - } - - private static TypeInfo generateTypeInfo(Type type) throws Exception { - if (primitiveTypeToTypeInfo.containsKey(type)) { - return (TypeInfo) primitiveTypeToTypeInfo.get(type); - } - switch (type.typeId()) { - case UUID: - return TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME); - case FIXED: - return TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BINARY_TYPE_NAME); - case TIME: - return TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME); - case DECIMAL: - Types.DecimalType dec = (Types.DecimalType) type; - HiveDecimalUtils.validateParameter(dec.precision(), dec.scale()); - return TypeInfoFactory.getDecimalTypeInfo(dec.precision(), dec.scale()); - case STRUCT: - return generateStructTypeInfo((Types.StructType) type); - case LIST: - return generateListTypeInfo((Types.ListType) type); - case MAP: - return generateMapTypeInfo((Types.MapType) type); - default: - throw new SerDeException("Can't map Iceberg type to Hive TypeInfo: '" + type.typeId() + "'"); - } - } - - private static TypeInfo generateMapTypeInfo(Types.MapType type) throws Exception { - Type keyType = type.keyType(); - Type valueType = type.valueType(); - return TypeInfoFactory.getMapTypeInfo(generateTypeInfo(keyType), generateTypeInfo(valueType)); - } - - private static TypeInfo generateStructTypeInfo(Types.StructType type) throws Exception { - List fields = type.fields(); - List fieldNames = new ArrayList<>(fields.size()); - List typeInfos = new ArrayList<>(fields.size()); - - for (Types.NestedField field : fields) { - fieldNames.add(field.name()); - typeInfos.add(generateTypeInfo(field.type())); - } - return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); - } - - private static TypeInfo generateListTypeInfo(Types.ListType type) throws Exception { - return TypeInfoFactory.getListTypeInfo(generateTypeInfo(type.elementType())); - } -} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java index 3544a8fbd61f..871e0e677835 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java @@ -21,15 +21,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Properties; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; @@ -38,28 +29,25 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.mr.mapred.serde.objectinspector.IcebergObjectInspector; public class IcebergSerDe extends AbstractSerDe { - private Schema schema; private ObjectInspector inspector; - private List row; @Override public void initialize(@Nullable Configuration configuration, Properties serDeProperties) throws SerDeException { - Table table = null; + final Table table; + try { table = TableResolver.resolveTableFromConfiguration(configuration, serDeProperties); } catch (IOException e) { throw new UncheckedIOException("Unable to resolve table from configuration: ", e); } - this.schema = table.schema(); + try { - this.inspector = new IcebergObjectInspectorGenerator().createObjectInspector(schema); + this.inspector = IcebergObjectInspector.create(table.schema()); } catch (Exception e) { throw new SerDeException(e); } @@ -67,7 +55,7 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr @Override public Class getSerializedClass() { - return null; + return IcebergWritable.class; } @Override @@ -82,31 +70,7 @@ public SerDeStats getSerDeStats() { @Override public Object deserialize(Writable writable) { - IcebergWritable icebergWritable = (IcebergWritable) writable; - List fields = icebergWritable.schema().columns(); - - if (row == null || row.size() != fields.size()) { - row = new ArrayList(fields.size()); - } else { - row.clear(); - } - for (int i = 0; i < fields.size(); i++) { - Object obj = ((IcebergWritable) writable).record().get(i); - Type fieldType = fields.get(i).type(); - if (fieldType.equals(Types.DateType.get())) { - row.add(Date.valueOf((LocalDate) obj)); - } else if (fieldType.equals(Types.TimestampType.withoutZone())) { - row.add(Timestamp.valueOf((LocalDateTime) obj)); - } else if (fieldType.equals(Types.TimestampType.withZone())) { - LocalDateTime timestamp = ((OffsetDateTime) obj).toLocalDateTime(); - row.add(Timestamp.valueOf(timestamp)); - } else if (fieldType.equals(Types.TimeType.get())) { - row.add(((LocalTime) obj).toString()); - } else { - row.add(obj); - } - } - return Collections.unmodifiableList(row); + return ((IcebergWritable) writable).record(); } @Override diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java index 3f3772e27e9e..1eb67f9d3158 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java @@ -33,6 +33,11 @@ public class IcebergWritable implements Writable { private Record record; private Schema schema; + public IcebergWritable(Record record, Schema schema) { + this.record = record; + this.schema = schema; + } + @SuppressWarnings("checkstyle:HiddenField") public void wrapRecord(Record record) { this.record = record; diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java index 600f7f48e770..8aa6857b7b0d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java @@ -20,11 +20,11 @@ package org.apache.iceberg.mr.mapred; import java.io.IOException; +import java.util.Optional; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.Table; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -45,27 +45,26 @@ static Table resolveTableFromJob(JobConf conf) throws IOException { static Table resolveTableFromConfiguration(Configuration conf, Properties properties) throws IOException { String catalogName = properties.getProperty(InputFormatConfig.CATALOG_NAME, InputFormatConfig.HADOOP_TABLES); - String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION); - String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME); - Preconditions.checkNotNull(tableLocation, "Table location is not set."); - Preconditions.checkNotNull(tableName, "Table name is not set."); + switch (catalogName) { case InputFormatConfig.HADOOP_TABLES: + String tableLocation = properties.getProperty(InputFormatConfig.TABLE_LOCATION); + Preconditions.checkNotNull(tableLocation, "Table location is not set."); HadoopTables tables = new HadoopTables(conf); return tables.load(tableLocation); + case InputFormatConfig.HIVE_CATALOG: + String tableName = properties.getProperty(InputFormatConfig.TABLE_NAME); + Preconditions.checkNotNull(tableName, "Table name is not set."); //TODO Implement HiveCatalog return null; default: - throw new NoSuchTableException("Table does not exist at location: " + tableLocation); + throw new RuntimeException("Catalog " + catalogName + " not supported."); } } protected static String extractProperty(JobConf conf, String key) { - String value = conf.get(key); - if (value == null) { - throw new IllegalArgumentException("Property not set in JobConf: " + key); - } - return value; + return Optional.ofNullable(conf.get(key)) + .orElseThrow(() -> new IllegalArgumentException("Property not set in JobConf: " + key)); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergBinaryObjectInspector.java new file mode 100644 index 000000000000..85103c65307c --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergBinaryObjectInspector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; + +public final class IcebergBinaryObjectInspector extends IcebergPrimitiveObjectInspector + implements BinaryObjectInspector { + + private static final IcebergBinaryObjectInspector INSTANCE = new IcebergBinaryObjectInspector(); + + public static IcebergBinaryObjectInspector get() { + return INSTANCE; + } + + private IcebergBinaryObjectInspector() { + super(TypeInfoFactory.binaryTypeInfo); + } + + @Override + public byte[] getPrimitiveJavaObject(Object o) { + return o == null ? null : ((ByteBuffer) o).array(); + } + + @Override + public BytesWritable getPrimitiveWritableObject(Object o) { + return o == null ? null : new BytesWritable(getPrimitiveJavaObject(o)); + } + + @Override + public Object copyObject(Object o) { + if (o == null) { + return null; + } + + byte[] bytes = (byte[]) o; + return Arrays.copyOf(bytes, bytes.length); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDateObjectInspector.java new file mode 100644 index 000000000000..2991540437c7 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDateObjectInspector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.sql.Date; +import java.time.LocalDate; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +public final class IcebergDateObjectInspector extends IcebergPrimitiveObjectInspector implements DateObjectInspector { + + private static final IcebergDateObjectInspector INSTANCE = new IcebergDateObjectInspector(); + + public static IcebergDateObjectInspector get() { + return INSTANCE; + } + + private IcebergDateObjectInspector() { + super(TypeInfoFactory.dateTypeInfo); + } + + @Override + public Date getPrimitiveJavaObject(Object o) { + return o == null ? null : Date.valueOf((LocalDate) o); + } + + @Override + public DateWritable getPrimitiveWritableObject(Object o) { + Date date = getPrimitiveJavaObject(o); + return date == null ? null : new DateWritable(date); + } + + @Override + public Object copyObject(Object o) { + return o == null ? null : new Date(((Date) o).getTime()); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDecimalObjectInspector.java new file mode 100644 index 000000000000..5d31ce814509 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergDecimalObjectInspector.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.math.BigDecimal; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public final class IcebergDecimalObjectInspector extends IcebergPrimitiveObjectInspector + implements HiveDecimalObjectInspector { + + private static final Cache CACHE = Caffeine.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .build(); + + public static IcebergDecimalObjectInspector get(int precision, int scale) { + Preconditions.checkArgument(scale < precision); + Preconditions.checkArgument(precision <= HiveDecimal.MAX_PRECISION); + Preconditions.checkArgument(scale <= HiveDecimal.MAX_SCALE); + + Integer key = precision << 8 | scale; + return CACHE.get(key, k -> new IcebergDecimalObjectInspector(precision, scale)); + } + + private IcebergDecimalObjectInspector(int precision, int scale) { + super(new DecimalTypeInfo(precision, scale)); + } + + @Override + public int precision() { + return ((DecimalTypeInfo) getTypeInfo()).precision(); + } + + @Override + public int scale() { + return ((DecimalTypeInfo) getTypeInfo()).scale(); + } + + @Override + public HiveDecimal getPrimitiveJavaObject(Object o) { + return o == null ? null : HiveDecimal.create((BigDecimal) o); + } + + @Override + public HiveDecimalWritable getPrimitiveWritableObject(Object o) { + HiveDecimal decimal = getPrimitiveJavaObject(o); + return decimal == null ? null : new HiveDecimalWritable(decimal); + } + + @Override + public Object copyObject(Object o) { + if (o == null) { + return null; + } + + HiveDecimal decimal = (HiveDecimal) o; + return HiveDecimal.create(decimal.bigDecimalValue()); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergObjectInspector.java new file mode 100644 index 000000000000..ca4875649415 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergObjectInspector.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +public final class IcebergObjectInspector extends TypeUtil.SchemaVisitor { + + public static ObjectInspector create(@Nullable Schema schema) { + if (schema == null) { + return IcebergRecordObjectInspector.empty(); + } + + return TypeUtil.visit(schema, new IcebergObjectInspector()); + } + + public static ObjectInspector create(Types.NestedField... fields) { + return create(new Schema(fields)); + } + + @Override + public ObjectInspector field(Types.NestedField field, ObjectInspector fieldObjectInspector) { + return fieldObjectInspector; + } + + @Override + public ObjectInspector list(Types.ListType listTypeInfo, ObjectInspector listObjectInspector) { + return ObjectInspectorFactory.getStandardListObjectInspector(listObjectInspector); + } + + @Override + public ObjectInspector map(Types.MapType mapType, + ObjectInspector keyObjectInspector, ObjectInspector valueObjectInspector) { + return ObjectInspectorFactory.getStandardMapObjectInspector(keyObjectInspector, valueObjectInspector); + } + + @Override + public ObjectInspector primitive(Type.PrimitiveType primitiveType) { + final PrimitiveTypeInfo primitiveTypeInfo; + + switch (primitiveType.typeId()) { + case BINARY: + return IcebergBinaryObjectInspector.get(); + case BOOLEAN: + primitiveTypeInfo = TypeInfoFactory.booleanTypeInfo; + break; + case DATE: + return IcebergDateObjectInspector.get(); + case DECIMAL: + Types.DecimalType type = (Types.DecimalType) primitiveType; + return IcebergDecimalObjectInspector.get(type.precision(), type.scale()); + case DOUBLE: + primitiveTypeInfo = TypeInfoFactory.doubleTypeInfo; + break; + case FLOAT: + primitiveTypeInfo = TypeInfoFactory.floatTypeInfo; + break; + case INTEGER: + primitiveTypeInfo = TypeInfoFactory.intTypeInfo; + break; + case LONG: + primitiveTypeInfo = TypeInfoFactory.longTypeInfo; + break; + case STRING: + primitiveTypeInfo = TypeInfoFactory.stringTypeInfo; + break; + case TIMESTAMP: + boolean adjustToUTC = ((Types.TimestampType) primitiveType).shouldAdjustToUTC(); + return IcebergTimestampObjectInspector.get(adjustToUTC); + + case FIXED: + case TIME: + case UUID: + default: + throw new IllegalArgumentException(primitiveType.typeId() + " type is not supported"); + } + + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveTypeInfo); + } + + @Override + public ObjectInspector schema(Schema schema, ObjectInspector structObjectInspector) { + return structObjectInspector; + } + + @Override + public ObjectInspector struct(Types.StructType structType, List fieldObjectInspectors) { + return new IcebergRecordObjectInspector(structType, fieldObjectInspectors); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergPrimitiveObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergPrimitiveObjectInspector.java new file mode 100644 index 000000000000..53c3560c2dd7 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergPrimitiveObjectInspector.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; + +abstract class IcebergPrimitiveObjectInspector implements PrimitiveObjectInspector { + + private final PrimitiveTypeInfo typeInfo; + + protected IcebergPrimitiveObjectInspector(PrimitiveTypeInfo typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public Category getCategory() { + return typeInfo.getCategory(); + } + + @Override + public String getTypeName() { + return typeInfo.getTypeName(); + } + + @Override + public PrimitiveTypeInfo getTypeInfo() { + return typeInfo; + } + + @Override + public PrimitiveObjectInspector.PrimitiveCategory getPrimitiveCategory() { + return typeInfo.getPrimitiveCategory(); + } + + @Override + public Class getJavaPrimitiveClass() { + return typeInfo.getPrimitiveJavaClass(); + } + + @Override + public Class getPrimitiveWritableClass() { + return typeInfo.getPrimitiveWritableClass(); + } + + @Override + public boolean preferWritable() { + return false; + } + + @Override + public int precision() { + return 0; + } + + @Override + public int scale() { + return 0; + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergRecordObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergRecordObjectInspector.java new file mode 100644 index 000000000000..7005e4239708 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergRecordObjectInspector.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; + +public final class IcebergRecordObjectInspector extends StructObjectInspector { + + private static final IcebergRecordObjectInspector EMPTY = + new IcebergRecordObjectInspector(Types.StructType.of(), Collections.emptyList()); + + private final List structFields; + + public IcebergRecordObjectInspector(Types.StructType structType, List objectInspectors) { + Preconditions.checkArgument(structType.fields().size() == objectInspectors.size()); + + this.structFields = Lists.newArrayListWithExpectedSize(structType.fields().size()); + + int position = 0; + + for (Types.NestedField field : structType.fields()) { + ObjectInspector oi = objectInspectors.get(position); + IcebergRecordStructField structField = new IcebergRecordStructField(field, oi, position); + structFields.add(structField); + position++; + } + } + + public static IcebergRecordObjectInspector empty() { + return EMPTY; + } + + @Override + public List getAllStructFieldRefs() { + return structFields; + } + + @Override + public StructField getStructFieldRef(String name) { + return ObjectInspectorUtils.getStandardStructFieldRef(name, structFields); + } + + @Override + public Object getStructFieldData(Object o, StructField structField) { + return ((Record) o).get(((IcebergRecordStructField) structField).position()); + } + + @Override + public List getStructFieldsDataAsList(Object o) { + Record record = (Record) o; + return structFields + .stream() + .map(f -> record.get(f.position())) + .collect(Collectors.toList()); + } + + @Override + public String getTypeName() { + return ObjectInspectorUtils.getStandardStructTypeName(this); + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IcebergRecordObjectInspector that = (IcebergRecordObjectInspector) o; + return structFields.equals(that.structFields); + } + + @Override + public int hashCode() { + return structFields.hashCode(); + } + + private static class IcebergRecordStructField implements StructField { + + private final Types.NestedField field; + private final ObjectInspector oi; + private final int position; + + IcebergRecordStructField(Types.NestedField field, ObjectInspector oi, int position) { + this.field = field; + this.oi = oi; + this.position = position; // position in the record + } + + @Override + public String getFieldName() { + return field.name(); + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return oi; + } + + @Override + public int getFieldID() { + return field.fieldId(); + } + + @Override + public String getFieldComment() { + return field.doc(); + } + + int position() { + return position; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IcebergRecordStructField that = (IcebergRecordStructField) o; + return field.equals(that.field) && oi.equals(that.oi); + } + + @Override + public int hashCode() { + return 31 * field.hashCode() + oi.hashCode(); + } + + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergTimestampObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergTimestampObjectInspector.java new file mode 100644 index 000000000000..569267df8496 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/serde/objectinspector/IcebergTimestampObjectInspector.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.function.Function; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +public final class IcebergTimestampObjectInspector extends IcebergPrimitiveObjectInspector + implements TimestampObjectInspector { + + private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE = + new IcebergTimestampObjectInspector(o -> ((OffsetDateTime) o).toLocalDateTime()); + + private static final IcebergTimestampObjectInspector INSTANCE_WITHOUT_ZONE = + new IcebergTimestampObjectInspector(o -> (LocalDateTime) o); + + public static IcebergTimestampObjectInspector get(boolean adjustToUTC) { + return adjustToUTC ? INSTANCE_WITH_ZONE : INSTANCE_WITHOUT_ZONE; + } + + private final Function cast; + + private IcebergTimestampObjectInspector(Function cast) { + super(TypeInfoFactory.timestampTypeInfo); + this.cast = cast; + } + + @Override + public Timestamp getPrimitiveJavaObject(Object o) { + return o == null ? null : Timestamp.valueOf(cast.apply(o)); + } + + @Override + public TimestampWritable getPrimitiveWritableObject(Object o) { + Timestamp ts = getPrimitiveJavaObject(o); + return ts == null ? null : new TimestampWritable(ts); + } + + @Override + public Object copyObject(Object o) { + if (o == null) { + return null; + } + + Timestamp ts = (Timestamp) o; + Timestamp copy = new Timestamp(ts.getTime()); + copy.setNanos(ts.getNanos()); + return copy; + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergObjectInspectorGenerator.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergObjectInspectorGenerator.java deleted file mode 100644 index ab05672746cc..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergObjectInspectorGenerator.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.mapred; - -import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types; -import org.junit.Test; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.junit.Assert.assertEquals; - -public class TestIcebergObjectInspectorGenerator { - - @Test - public void testGetColumnNames() throws Exception { - Schema schema = new Schema(optional(1, "name", Types.StringType.get()), - optional(2, "salary", Types.LongType.get())); - IcebergObjectInspectorGenerator oi = new IcebergObjectInspectorGenerator(); - - List fieldsNames = oi.setColumnNames(schema); - assertEquals(fieldsNames.size(), 2); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java deleted file mode 100644 index 794d3f540b20..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSchemaToTypeInfo.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.mapred; - -import java.util.List; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types; -import org.junit.Test; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.junit.Assert.assertEquals; - -public class TestIcebergSchemaToTypeInfo { - - @Test - public void testGeneratePrimitiveTypeInfo() throws Exception { - Schema schema = new Schema( - required(1, "id", Types.IntegerType.get()), - optional(2, "data", Types.StringType.get()), - required(8, "feature1", Types.BooleanType.get()), - required(12, "lat", Types.FloatType.get()), - required(15, "x", Types.LongType.get()), - required(16, "date", Types.DateType.get()), - required(17, "double", Types.DoubleType.get()), - required(18, "binary", Types.BinaryType.get())); - List types = IcebergSchemaToTypeInfo.getColumnTypes(schema); - - assertEquals("Converted TypeInfo should have the same number of columns.", 8, types.size()); - assertEquals("IntegerType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.INT_TYPE_NAME), types.get(0)); - assertEquals("StringType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME), types.get(1)); - assertEquals("BooleanType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BOOLEAN_TYPE_NAME), types.get(2)); - assertEquals("FloatType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.FLOAT_TYPE_NAME), types.get(3)); - assertEquals("LongType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BIGINT_TYPE_NAME), types.get(4)); - assertEquals("DateType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.DATE_TYPE_NAME), types.get(5)); - assertEquals("DoubleType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.DOUBLE_TYPE_NAME), types.get(6)); - assertEquals("BinaryType converted incorrectly.", - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.BINARY_TYPE_NAME), types.get(7)); - } - - @Test - public void testGenerateMapWithStringKeyTypeInfo() throws Exception { - TypeInfo expected = TypeInfoFactory.getMapTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME), - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME)); - - Schema schema = new Schema( - optional(7, "properties", Types.MapType.ofOptional(18, 19, - Types.StringType.get(), - Types.StringType.get() - ), "string map of properties")); - - List types = IcebergSchemaToTypeInfo.getColumnTypes(schema); - - assertEquals("Converted TypeInfo should have the same number of columns.", 1, types.size()); - assertEquals("MapType converted incorrectly.", expected, types.get(0)); - } - - @Test - public void testGenerateMapWithIntKeyTypeInfo() throws Exception { - TypeInfo expected = TypeInfoFactory.getMapTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.INT_TYPE_NAME), - TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.STRING_TYPE_NAME)); - - Schema schema = new Schema( - optional(7, "properties", Types.MapType.ofOptional(18, 19, - Types.IntegerType.get(), - Types.StringType.get() - ), "string map of properties")); - - List types = IcebergSchemaToTypeInfo.getColumnTypes(schema); - - assertEquals("Converted TypeInfo should have the same number of columns.", 1, types.size()); - assertEquals("MapType converted incorrectly.", expected, types.get(0)); - } - - @Test - public void testGenerateListTypeInfo() throws Exception { - TypeInfo expected = TypeInfoFactory - .getListTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.DOUBLE_TYPE_NAME)); - Schema schema = new Schema( - required(6, "doubles", Types.ListType.ofRequired(17, - Types.DoubleType.get() - ))); - List types = IcebergSchemaToTypeInfo.getColumnTypes(schema); - - assertEquals("Converted TypeInfo should have the same number of columns.", 1, types.size()); - assertEquals("ListType converted incorrectly.", expected, types.get(0)); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java index c81d1a263090..937ddba64da2 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java @@ -19,63 +19,62 @@ package org.apache.iceberg.mr.mapred; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.List; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.mapred.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import static org.apache.iceberg.types.Types.NestedField.required; -import static org.junit.Assert.assertArrayEquals; public class TestIcebergSerDe { + private static final Schema schema = new Schema(required(1, "string_field", Types.StringType.get())); + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + @Test - public void testDeserializeWritable() { - Schema schema = new Schema(required(1, "string_type", Types.StringType.get()), - required(2, "int_type", Types.IntegerType.get()), - required(3, "long_type", Types.LongType.get()), - required(4, "boolean_type", Types.BooleanType.get()), - required(5, "float_type", Types.FloatType.get()), - required(6, "double_type", Types.DoubleType.get()), - required(7, "binary_type", Types.BinaryType.get()), - required(8, "date_type", Types.DateType.get()), - required(9, "timestamp_with_zone_type", Types.TimestampType.withZone()), - required(10, "timestamp_without_zone_type", Types.TimestampType.withoutZone()), - required(11, "map_type", Types.MapType - .ofRequired(12, 13, Types.IntegerType.get(), Types.StringType.get())), - required(14, "list_type", Types.ListType.ofRequired(15, Types.LongType.get())) - ); - LocalDate localDate = LocalDate.of(2018, 11, 10); - LocalDateTime localDateTime = LocalDateTime.of(2018, 11, 10, 11, 55); - OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.UTC); - - Object[] input = Lists.newArrayList("foo", 5, 6L, true, 1.02F, 1.4D, new byte[] { (byte) 0xe0}, - localDate, offsetDateTime, localDateTime, ImmutableMap.of(22, "bar"), - Arrays.asList(1000L, 2000L, 3000L)).toArray(); - - //Inputs and outputs differ slightly because of Date/Timestamp conversions for Hive - Object[] expected = Lists.newArrayList("foo", 5, 6L, true, 1.02F, 1.4D, new byte[] { (byte) 0xe0}, - Date.valueOf(localDate), Timestamp.valueOf(offsetDateTime.toLocalDateTime()), Timestamp.valueOf(localDateTime), - ImmutableMap.of(22, "bar"), Arrays.asList(1000L, 2000L, 3000L)).toArray(); - - Record record = TestHelpers.createCustomRecord(schema, input); - IcebergWritable writable = new IcebergWritable(); - writable.wrapRecord(record); - writable.wrapSchema(schema); + public void testInitialize() throws IOException, SerDeException { + File location = tmp.newFolder(); + Assert.assertTrue(location.delete()); + + Configuration conf = new Configuration(); + + Properties properties = new Properties(); + properties.setProperty(InputFormatConfig.CATALOG_NAME, InputFormatConfig.HADOOP_TABLES); + properties.setProperty(InputFormatConfig.TABLE_LOCATION, location.toString()); + + HadoopTables tables = new HadoopTables(conf); + tables.create(schema, PartitionSpec.unpartitioned(), Collections.emptyMap(), location.toString()); IcebergSerDe serDe = new IcebergSerDe(); - List deserialized = (List) serDe.deserialize(writable); - assertArrayEquals("Test values from an Iceberg Record deserialize into expected Java objects.", - expected, deserialized.toArray()); + serDe.initialize(conf, properties); + + Assert.assertEquals(IcebergObjectInspector.create(schema), serDe.getObjectInspector()); } + + @Test + public void testDeserialize() { + IcebergSerDe serDe = new IcebergSerDe(); + + Record record = RandomGenericData.generate(schema, 1, 0).get(0); + IcebergWritable writable = new IcebergWritable(record, schema); + + Assert.assertEquals(record, serDe.deserialize(writable)); + } + } diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergBinaryObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergBinaryObjectInspector.java new file mode 100644 index 000000000000..5d88da53cd6c --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergBinaryObjectInspector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.nio.ByteBuffer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergBinaryObjectInspector { + + @Test + public void testIcebergBinaryObjectInspector() { + BinaryObjectInspector oi = IcebergBinaryObjectInspector.get(); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.BINARY, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.binaryTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.binaryTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(byte[].class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(BytesWritable.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + byte[] bytes = new byte[] {0, 1}; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + Assert.assertArrayEquals(bytes, oi.getPrimitiveJavaObject(buffer)); + Assert.assertEquals(new BytesWritable(bytes), oi.getPrimitiveWritableObject(buffer)); + + byte[] copy = (byte[]) oi.copyObject(bytes); + + Assert.assertArrayEquals(bytes, copy); + Assert.assertNotSame(bytes, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDateObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDateObjectInspector.java new file mode 100644 index 000000000000..28962aa352f4 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDateObjectInspector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.sql.Date; +import java.time.LocalDate; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergDateObjectInspector { + + @Test + public void testIcebergDateObjectInspector() { + DateObjectInspector oi = IcebergDateObjectInspector.get(); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.DATE, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.dateTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.dateTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(Date.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(DateWritable.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + LocalDate local = LocalDate.of(2020, 1, 1); + Date date = Date.valueOf("2020-01-01"); + + Assert.assertEquals(date, oi.getPrimitiveJavaObject(local)); + Assert.assertEquals(new DateWritable(date), oi.getPrimitiveWritableObject(local)); + + Date copy = (Date) oi.copyObject(date); + + Assert.assertEquals(date, copy); + Assert.assertNotSame(date, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDecimalObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDecimalObjectInspector.java new file mode 100644 index 000000000000..77489a4478e0 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergDecimalObjectInspector.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.math.BigDecimal; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergDecimalObjectInspector { + + @Test + public void testCache() { + HiveDecimalObjectInspector oi = IcebergDecimalObjectInspector.get(38, 18); + + Assert.assertSame(oi, IcebergDecimalObjectInspector.get(38, 18)); + Assert.assertNotSame(oi, IcebergDecimalObjectInspector.get(28, 18)); + Assert.assertNotSame(oi, IcebergDecimalObjectInspector.get(38, 28)); + } + + @Test + public void testIcebergDecimalObjectInspector() { + HiveDecimalObjectInspector oi = IcebergDecimalObjectInspector.get(38, 18); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.DECIMAL, oi.getPrimitiveCategory()); + + Assert.assertEquals(new DecimalTypeInfo(38, 18), oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.decimalTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(38, oi.precision()); + Assert.assertEquals(18, oi.scale()); + + Assert.assertEquals(HiveDecimal.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(HiveDecimalWritable.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + HiveDecimal one = HiveDecimal.create(BigDecimal.ONE); + + Assert.assertEquals(one, oi.getPrimitiveJavaObject(BigDecimal.ONE)); + Assert.assertEquals(new HiveDecimalWritable(one), oi.getPrimitiveWritableObject(BigDecimal.ONE)); + + HiveDecimal copy = (HiveDecimal) oi.copyObject(one); + + Assert.assertEquals(one, copy); + Assert.assertNotSame(one, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergObjectInspector.java new file mode 100644 index 000000000000..b280b05fb86d --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergObjectInspector.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.required; + + +public class TestIcebergObjectInspector { + + private final Schema schema = new Schema( + required(0, "binary_field", Types.BinaryType.get(), "binary comment"), + required(1, "boolean_field", Types.BooleanType.get(), "boolean comment"), + required(2, "date_field", Types.DateType.get(), "date comment"), + required(3, "decimal_field", Types.DecimalType.of(38, 18), "decimal comment"), + required(4, "double_field", Types.DoubleType.get(), "double comment"), + required(5, "float_field", Types.FloatType.get(), "float comment"), + required(6, "integer_field", Types.IntegerType.get(), "integer comment"), + required(7, "long_field", Types.LongType.get(), "long comment"), + required(8, "string_field", Types.StringType.get(), "string comment"), + required(9, "timestamp_field", Types.TimestampType.withoutZone(), "timestamp comment"), + required(10, "timestamptz_field", Types.TimestampType.withZone(), "timestamptz comment"), + required(11, "list_field", + Types.ListType.ofRequired(12, Types.StringType.get()), "list comment"), + required(13, "map_field", + Types.MapType.ofRequired(14, 15, Types.StringType.get(), Types.IntegerType.get()), + "map comment"), + required(16, "struct_field", Types.StructType.of( + Types.NestedField.required(17, "nested_field", Types.StringType.get(), "nested field comment")), + "struct comment" + ) + ); + + @Test + public void testIcebergObjectInspector() { + ObjectInspector oi = IcebergObjectInspector.create(schema); + Assert.assertNotNull(oi); + Assert.assertEquals(ObjectInspector.Category.STRUCT, oi.getCategory()); + + StructObjectInspector soi = (StructObjectInspector) oi; + + // binary + StructField binaryField = soi.getStructFieldRef("binary_field"); + Assert.assertEquals(0, binaryField.getFieldID()); + Assert.assertEquals("binary_field", binaryField.getFieldName()); + Assert.assertEquals("binary comment", binaryField.getFieldComment()); + Assert.assertEquals(IcebergBinaryObjectInspector.get(), binaryField.getFieldObjectInspector()); + + // boolean + StructField booleanField = soi.getStructFieldRef("boolean_field"); + Assert.assertEquals(1, booleanField.getFieldID()); + Assert.assertEquals("boolean_field", booleanField.getFieldName()); + Assert.assertEquals("boolean comment", booleanField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(boolean.class), booleanField.getFieldObjectInspector()); + + // date + StructField dateField = soi.getStructFieldRef("date_field"); + Assert.assertEquals(2, dateField.getFieldID()); + Assert.assertEquals("date_field", dateField.getFieldName()); + Assert.assertEquals("date comment", dateField.getFieldComment()); + Assert.assertEquals(IcebergDateObjectInspector.get(), dateField.getFieldObjectInspector()); + + // decimal + StructField decimalField = soi.getStructFieldRef("decimal_field"); + Assert.assertEquals(3, decimalField.getFieldID()); + Assert.assertEquals("decimal_field", decimalField.getFieldName()); + Assert.assertEquals("decimal comment", decimalField.getFieldComment()); + Assert.assertEquals(IcebergDecimalObjectInspector.get(38, 18), decimalField.getFieldObjectInspector()); + + // double + StructField doubleField = soi.getStructFieldRef("double_field"); + Assert.assertEquals(4, doubleField.getFieldID()); + Assert.assertEquals("double_field", doubleField.getFieldName()); + Assert.assertEquals("double comment", doubleField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(double.class), doubleField.getFieldObjectInspector()); + + // float + StructField floatField = soi.getStructFieldRef("float_field"); + Assert.assertEquals(5, floatField.getFieldID()); + Assert.assertEquals("float_field", floatField.getFieldName()); + Assert.assertEquals("float comment", floatField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(float.class), floatField.getFieldObjectInspector()); + + // integer + StructField integerField = soi.getStructFieldRef("integer_field"); + Assert.assertEquals(6, integerField.getFieldID()); + Assert.assertEquals("integer_field", integerField.getFieldName()); + Assert.assertEquals("integer comment", integerField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(int.class), integerField.getFieldObjectInspector()); + + // long + StructField longField = soi.getStructFieldRef("long_field"); + Assert.assertEquals(7, longField.getFieldID()); + Assert.assertEquals("long_field", longField.getFieldName()); + Assert.assertEquals("long comment", longField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(long.class), longField.getFieldObjectInspector()); + + // string + StructField stringField = soi.getStructFieldRef("string_field"); + Assert.assertEquals(8, stringField.getFieldID()); + Assert.assertEquals("string_field", stringField.getFieldName()); + Assert.assertEquals("string comment", stringField.getFieldComment()); + Assert.assertEquals(getPrimitiveObjectInspector(String.class), stringField.getFieldObjectInspector()); + + // timestamp without tz + StructField timestampField = soi.getStructFieldRef("timestamp_field"); + Assert.assertEquals(9, timestampField.getFieldID()); + Assert.assertEquals("timestamp_field", timestampField.getFieldName()); + Assert.assertEquals("timestamp comment", timestampField.getFieldComment()); + Assert.assertEquals(IcebergTimestampObjectInspector.get(false), timestampField.getFieldObjectInspector()); + + // timestamp with tz + StructField timestampTzField = soi.getStructFieldRef("timestamptz_field"); + Assert.assertEquals(10, timestampTzField.getFieldID()); + Assert.assertEquals("timestamptz_field", timestampTzField.getFieldName()); + Assert.assertEquals("timestamptz comment", timestampTzField.getFieldComment()); + Assert.assertEquals(IcebergTimestampObjectInspector.get(true), timestampTzField.getFieldObjectInspector()); + + // list + StructField listField = soi.getStructFieldRef("list_field"); + Assert.assertEquals(11, listField.getFieldID()); + Assert.assertEquals("list_field", listField.getFieldName()); + Assert.assertEquals("list comment", listField.getFieldComment()); + Assert.assertEquals(getListObjectInspector(String.class), listField.getFieldObjectInspector()); + + // map + StructField mapField = soi.getStructFieldRef("map_field"); + Assert.assertEquals(13, mapField.getFieldID()); + Assert.assertEquals("map_field", mapField.getFieldName()); + Assert.assertEquals("map comment", mapField.getFieldComment()); + Assert.assertEquals(getMapObjectInspector(String.class, int.class), mapField.getFieldObjectInspector()); + + // struct + StructField structField = soi.getStructFieldRef("struct_field"); + Assert.assertEquals(16, structField.getFieldID()); + Assert.assertEquals("struct_field", structField.getFieldName()); + Assert.assertEquals("struct comment", structField.getFieldComment()); + + ObjectInspector expectedObjectInspector = new IcebergRecordObjectInspector( + (Types.StructType) schema.findType(16), ImmutableList.of(getPrimitiveObjectInspector(String.class))); + Assert.assertEquals(expectedObjectInspector, structField.getFieldObjectInspector()); + } + + @Test + public void testIcebergObjectInspectorUnsupportedTypes() { + AssertHelpers.assertThrows( + "Hive does not support fixed type", IllegalArgumentException.class, "FIXED type is not supported", + () -> IcebergObjectInspector.create(required(1, "fixed_field", Types.FixedType.ofLength(1)))); + + AssertHelpers.assertThrows( + "Hive does not support time type", IllegalArgumentException.class, "TIME type is not supported", + () -> IcebergObjectInspector.create(required(1, "time_field", Types.TimeType.get()))); + + AssertHelpers.assertThrows( + "Hive does not support UUID type", IllegalArgumentException.class, "UUID type is not supported", + () -> IcebergObjectInspector.create(required(1, "uuid_field", Types.UUIDType.get()))); + } + + private static ObjectInspector getPrimitiveObjectInspector(Class clazz) { + PrimitiveTypeInfo typeInfo = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfoFromJavaPrimitive(clazz); + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo); + } + + private static ObjectInspector getListObjectInspector(Class clazz) { + return ObjectInspectorFactory.getStandardListObjectInspector(getPrimitiveObjectInspector(clazz)); + } + + private static ObjectInspector getMapObjectInspector(Class keyClazz, Class valueClazz) { + return ObjectInspectorFactory.getStandardMapObjectInspector( + getPrimitiveObjectInspector(keyClazz), getPrimitiveObjectInspector(valueClazz)); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergRecordObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergRecordObjectInspector.java new file mode 100644 index 000000000000..edfaa1722185 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergRecordObjectInspector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestIcebergRecordObjectInspector { + + @Test + public void testIcebergRecordObjectInspector() { + Schema schema = new Schema( + required(1, "integer_field", Types.IntegerType.get()), + required(2, "struct_field", Types.StructType.of( + Types.NestedField.required(3, "string_field", Types.StringType.get()))) + ); + + Record record = RandomGenericData.generate(schema, 1, 0L).get(0); + Record innerRecord = record.get(1, Record.class); + + StructObjectInspector soi = (StructObjectInspector) IcebergObjectInspector.create(schema); + Assert.assertEquals(ImmutableList.of(record.get(0), record.get(1)), soi.getStructFieldsDataAsList(record)); + + StructField integerField = soi.getStructFieldRef("integer_field"); + Assert.assertEquals(record.get(0), soi.getStructFieldData(record, integerField)); + + StructField structField = soi.getStructFieldRef("struct_field"); + Object innerData = soi.getStructFieldData(record, structField); + Assert.assertEquals(innerRecord, innerData); + + StructObjectInspector innerSoi = (StructObjectInspector) structField.getFieldObjectInspector(); + StructField stringField = innerSoi.getStructFieldRef("string_field"); + + Assert.assertEquals(ImmutableList.of(innerRecord.get(0)), innerSoi.getStructFieldsDataAsList(innerRecord)); + Assert.assertEquals(innerRecord.get(0), innerSoi.getStructFieldData(innerData, stringField)); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergTimestampObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergTimestampObjectInspector.java new file mode 100644 index 000000000000..a1f6c18b8dd6 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/serde/objectinspector/TestIcebergTimestampObjectInspector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.serde.objectinspector; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Test; + +public class TestIcebergTimestampObjectInspector { + + @Test + public void testIcebergTimestampObjectInspector() { + TimestampObjectInspector oi = IcebergTimestampObjectInspector.get(false); + + Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory()); + Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP, oi.getPrimitiveCategory()); + + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo, oi.getTypeInfo()); + Assert.assertEquals(TypeInfoFactory.timestampTypeInfo.getTypeName(), oi.getTypeName()); + + Assert.assertEquals(Timestamp.class, oi.getJavaPrimitiveClass()); + Assert.assertEquals(TimestampWritable.class, oi.getPrimitiveWritableClass()); + + Assert.assertNull(oi.copyObject(null)); + Assert.assertNull(oi.getPrimitiveJavaObject(null)); + Assert.assertNull(oi.getPrimitiveWritableObject(null)); + + LocalDateTime local = LocalDateTime.of(2020, 1, 1, 0, 0); + Timestamp ts = Timestamp.valueOf("2020-01-01 00:00:00"); + + Assert.assertEquals(ts, oi.getPrimitiveJavaObject(local)); + Assert.assertEquals(new TimestampWritable(ts), oi.getPrimitiveWritableObject(local)); + + Timestamp copy = (Timestamp) oi.copyObject(ts); + + Assert.assertEquals(ts, copy); + Assert.assertNotSame(ts, copy); + + Assert.assertFalse(oi.preferWritable()); + } + +}