Skip to content
This repository was archived by the owner on Jun 15, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,46 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.iceberg.util.ByteBuffers;

public final class IcebergBinaryObjectInspector extends IcebergPrimitiveObjectInspector
implements BinaryObjectInspector {
public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJavaObjectInspector
implements BinaryObjectInspector {

private static final IcebergBinaryObjectInspector INSTANCE = new IcebergBinaryObjectInspector();
private static final IcebergBinaryObjectInspector BYTE_ARRAY = new IcebergBinaryObjectInspector() {
@Override
byte[] toByteArray(Object o) {
return (byte[]) o;
}
};

private static final IcebergBinaryObjectInspector BYTE_BUFFER = new IcebergBinaryObjectInspector() {
@Override
byte[] toByteArray(Object o) {
return ByteBuffers.toByteArray((ByteBuffer) o);
}
};

public static IcebergBinaryObjectInspector get() {
return INSTANCE;
public static IcebergBinaryObjectInspector byteArray() {
return BYTE_ARRAY;
}

public static IcebergBinaryObjectInspector byteBuffer() {
return BYTE_BUFFER;
}

private IcebergBinaryObjectInspector() {
super(TypeInfoFactory.binaryTypeInfo);
}

abstract byte[] toByteArray(Object object);

@Override
public byte[] getPrimitiveJavaObject(Object o) {
return o == null ? null : ((ByteBuffer) o).array();
return toByteArray(o);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.sql.Date;
import java.time.LocalDate;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.util.DateTimeUtil;

public final class IcebergDateObjectInspector extends IcebergPrimitiveObjectInspector implements DateObjectInspector {
public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjectInspector
implements DateObjectInspector {

private static final IcebergDateObjectInspector INSTANCE = new IcebergDateObjectInspector();

Expand All @@ -44,8 +47,7 @@ public Date getPrimitiveJavaObject(Object o) {

@Override
public DateWritable getPrimitiveWritableObject(Object o) {
Date date = getPrimitiveJavaObject(o);
return date == null ? null : new DateWritable(date);
return o == null ? null : new DateWritable(DateTimeUtil.daysFromDate((LocalDate) o));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public final class IcebergDecimalObjectInspector extends IcebergPrimitiveObjectInspector
public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector
implements HiveDecimalObjectInspector {

private static final Cache<Integer, IcebergDecimalObjectInspector> CACHE = Caffeine.newBuilder()
Expand All @@ -49,16 +50,6 @@ private IcebergDecimalObjectInspector(int precision, int scale) {
super(new DecimalTypeInfo(precision, scale));
}

@Override
public int precision() {
return ((DecimalTypeInfo) getTypeInfo()).precision();
}

@Override
public int scale() {
return ((DecimalTypeInfo) getTypeInfo()).scale();
}

@Override
public HiveDecimal getPrimitiveJavaObject(Object o) {
return o == null ? null : HiveDecimal.create((BigDecimal) o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ObjectInspector primitive(Type.PrimitiveType primitiveType) {

switch (primitiveType.typeId()) {
case BINARY:
return IcebergBinaryObjectInspector.get();
return IcebergBinaryObjectInspector.byteBuffer();
case BOOLEAN:
primitiveTypeInfo = TypeInfoFactory.booleanTypeInfo;
break;
Expand All @@ -79,6 +79,8 @@ public ObjectInspector primitive(Type.PrimitiveType primitiveType) {
case DOUBLE:
primitiveTypeInfo = TypeInfoFactory.doubleTypeInfo;
break;
case FIXED:
return IcebergBinaryObjectInspector.byteArray();
case FLOAT:
primitiveTypeInfo = TypeInfoFactory.floatTypeInfo;
break;
Expand All @@ -89,15 +91,14 @@ public ObjectInspector primitive(Type.PrimitiveType primitiveType) {
primitiveTypeInfo = TypeInfoFactory.longTypeInfo;
break;
case STRING:
case UUID:
primitiveTypeInfo = TypeInfoFactory.stringTypeInfo;
break;
case TIMESTAMP:
boolean adjustToUTC = ((Types.TimestampType) primitiveType).shouldAdjustToUTC();
return IcebergTimestampObjectInspector.get(adjustToUTC);

case FIXED:
case TIME:
case UUID:
default:
throw new IllegalArgumentException(primitiveType.typeId() + " type is not supported");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
Expand Down Expand Up @@ -157,12 +158,12 @@ public boolean equals(Object o) {
}

IcebergRecordStructField that = (IcebergRecordStructField) o;
return field.equals(that.field) && oi.equals(that.oi);
return field.equals(that.field) && oi.equals(that.oi) && position == that.position;
}

@Override
public int hashCode() {
return 31 * field.hashCode() + oi.hashCode();
return Objects.hash(field, oi, position);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,42 @@
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.function.Function;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

public final class IcebergTimestampObjectInspector extends IcebergPrimitiveObjectInspector
implements TimestampObjectInspector {
public abstract class IcebergTimestampObjectInspector extends AbstractPrimitiveJavaObjectInspector
implements TimestampObjectInspector {

private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE =
new IcebergTimestampObjectInspector(o -> ((OffsetDateTime) o).toLocalDateTime());
private static final IcebergTimestampObjectInspector INSTANCE_WITH_ZONE = new IcebergTimestampObjectInspector() {
@Override
LocalDateTime toLocalDateTime(Object o) {
return ((OffsetDateTime) o).toLocalDateTime();
}
};

private static final IcebergTimestampObjectInspector INSTANCE_WITHOUT_ZONE =
new IcebergTimestampObjectInspector(o -> (LocalDateTime) o);
private static final IcebergTimestampObjectInspector INSTANCE_WITHOUT_ZONE = new IcebergTimestampObjectInspector() {
@Override
LocalDateTime toLocalDateTime(Object o) {
return (LocalDateTime) o;
}
};

public static IcebergTimestampObjectInspector get(boolean adjustToUTC) {
return adjustToUTC ? INSTANCE_WITH_ZONE : INSTANCE_WITHOUT_ZONE;
}

private final Function<Object, LocalDateTime> cast;

private IcebergTimestampObjectInspector(Function<Object, LocalDateTime> cast) {
private IcebergTimestampObjectInspector() {
super(TypeInfoFactory.timestampTypeInfo);
this.cast = cast;
}


abstract LocalDateTime toLocalDateTime(Object object);

@Override
public Timestamp getPrimitiveJavaObject(Object o) {
return o == null ? null : Timestamp.valueOf(cast.apply(o));
return o == null ? null : Timestamp.valueOf(toLocalDateTime(o));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class TestIcebergBinaryObjectInspector {

@Test
public void testIcebergBinaryObjectInspector() {
BinaryObjectInspector oi = IcebergBinaryObjectInspector.get();
public void testIcebergByteArrayObjectInspector() {
BinaryObjectInspector oi = IcebergBinaryObjectInspector.byteArray();

Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory());
Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.BINARY, oi.getPrimitiveCategory());
Expand All @@ -48,11 +48,49 @@ public void testIcebergBinaryObjectInspector() {
Assert.assertNull(oi.getPrimitiveWritableObject(null));

byte[] bytes = new byte[] {0, 1};
ByteBuffer buffer = ByteBuffer.wrap(bytes);

Assert.assertArrayEquals(bytes, oi.getPrimitiveJavaObject(bytes));
Assert.assertEquals(new BytesWritable(bytes), oi.getPrimitiveWritableObject(bytes));

byte[] copy = (byte[]) oi.copyObject(bytes);

Assert.assertArrayEquals(bytes, copy);
Assert.assertNotSame(bytes, copy);

Assert.assertFalse(oi.preferWritable());
}

@Test
public void testIcebergByteBufferObjectInspector() {
BinaryObjectInspector oi = IcebergBinaryObjectInspector.byteBuffer();

Assert.assertEquals(ObjectInspector.Category.PRIMITIVE, oi.getCategory());
Assert.assertEquals(PrimitiveObjectInspector.PrimitiveCategory.BINARY, oi.getPrimitiveCategory());

Assert.assertEquals(TypeInfoFactory.binaryTypeInfo, oi.getTypeInfo());
Assert.assertEquals(TypeInfoFactory.binaryTypeInfo.getTypeName(), oi.getTypeName());

Assert.assertEquals(byte[].class, oi.getJavaPrimitiveClass());
Assert.assertEquals(BytesWritable.class, oi.getPrimitiveWritableClass());

Assert.assertNull(oi.copyObject(null));
Assert.assertNull(oi.getPrimitiveJavaObject(null));
Assert.assertNull(oi.getPrimitiveWritableObject(null));

byte[] bytes = new byte[] {0, 1, 2, 3};

ByteBuffer buffer = ByteBuffer.wrap(bytes);
Assert.assertArrayEquals(bytes, oi.getPrimitiveJavaObject(buffer));
Assert.assertEquals(new BytesWritable(bytes), oi.getPrimitiveWritableObject(buffer));

ByteBuffer slice = ByteBuffer.wrap(bytes, 1, 2).slice();
Assert.assertArrayEquals(new byte[] {1, 2}, oi.getPrimitiveJavaObject(slice));
Assert.assertEquals(new BytesWritable(new byte[] {1, 2}), oi.getPrimitiveWritableObject(slice));

slice.position(1);
Assert.assertArrayEquals(new byte[] {2}, oi.getPrimitiveJavaObject(slice));
Assert.assertEquals(new BytesWritable(new byte[] {2}), oi.getPrimitiveWritableObject(slice));

byte[] copy = (byte[]) oi.copyObject(bytes);

Assert.assertArrayEquals(bytes, copy);
Expand Down
Loading