Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@
<dependency>
<groupId>io.prestosql.hive</groupId>
<artifactId>hive-apache</artifactId>
<version>3.0.0-1</version>
<version>3.1.1-1-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import io.prestosql.hadoop.TextLineLengthLimitExceededException;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
Expand All @@ -44,12 +47,10 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.LongUnaryOperator;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -90,6 +91,7 @@ class GenericHiveRecordCursor<K, V extends Writable>
private final V value;

private final Deserializer deserializer;
private final LongUnaryOperator timestampConverter;

private final Type[] types;
private final HiveType[] hiveTypes;
Expand All @@ -107,7 +109,6 @@ class GenericHiveRecordCursor<K, V extends Writable>
private final boolean[] nulls;

private final long totalBytes;
private final DateTimeZone hiveStorageTimeZone;

private long completedBytes;
private Object rowData;
Expand All @@ -121,6 +122,7 @@ public GenericHiveRecordCursor(
Properties splitSchema,
List<HiveColumnHandle> columns,
DateTimeZone hiveStorageTimeZone,
ConnectorSession session,
TypeManager typeManager)
{
requireNonNull(path, "path is null");
Expand All @@ -129,17 +131,25 @@ public GenericHiveRecordCursor(
requireNonNull(splitSchema, "splitSchema is null");
requireNonNull(columns, "columns is null");
requireNonNull(hiveStorageTimeZone, "hiveStorageTimeZone is null");
requireNonNull(session, "session is null");

this.path = path;
this.recordReader = recordReader;
this.totalBytes = totalBytes;
this.key = recordReader.createKey();
this.value = recordReader.createValue();
this.hiveStorageTimeZone = hiveStorageTimeZone;

this.deserializer = getDeserializer(configuration, splitSchema);
this.rowInspector = getTableObjectInspector(deserializer);

if (session.isLegacyTimestamp()) {
// convert using the real time zone for the underlying data
timestampConverter = millis -> hiveStorageTimeZone.convertLocalToUTC(millis, false);
}
else {
timestampConverter = LongUnaryOperator.identity();
}

int size = columns.size();

this.types = new Type[size];
Expand Down Expand Up @@ -280,35 +290,19 @@ private void parseLongColumn(int column)
else {
Object fieldValue = ((PrimitiveObjectInspector) fieldInspectors[column]).getPrimitiveJavaObject(fieldData);
checkState(fieldValue != null, "fieldValue should not be null");
longs[column] = getLongExpressedValue(fieldValue, hiveStorageTimeZone);
longs[column] = getLongExpressedValue(fieldValue);
nulls[column] = false;
}
}

private static long getLongExpressedValue(Object value, DateTimeZone hiveTimeZone)
private long getLongExpressedValue(Object value)
{
if (value instanceof Date) {
long storageTime = ((Date) value).getTime();
// convert date from VM current time zone to UTC
long utcMillis = storageTime + DateTimeZone.getDefault().getOffset(storageTime);
return TimeUnit.MILLISECONDS.toDays(utcMillis);
return ((Date) value).toEpochDay();
}
if (value instanceof Timestamp) {
// The Hive SerDe parses timestamps using the default time zone of
// this JVM, but the data might have been written using a different
// time zone. We need to convert it to the configured time zone.

// the timestamp that Hive parsed using the JVM time zone
long parsedJvmMillis = ((Timestamp) value).getTime();

// remove the JVM time zone correction from the timestamp
DateTimeZone jvmTimeZone = DateTimeZone.getDefault();
long hiveMillis = jvmTimeZone.convertUTCToLocal(parsedJvmMillis);

// convert to UTC using the real time zone for the underlying data
long utcMillis = hiveTimeZone.convertLocalToUTC(hiveMillis, false);

return utcMillis;
long millis = ((Timestamp) value).toEpochMilli();
return timestampConverter.applyAsLong(millis);
}
if (value instanceof Float) {
return floatToRawIntBits(((Float) value));
Expand Down Expand Up @@ -451,7 +445,7 @@ private void parseObjectColumn(int column)
nulls[column] = true;
}
else {
objects[column] = getBlockObject(types[column], fieldData, fieldInspectors[column]);
objects[column] = getBlockObject(types[column], fieldData, fieldInspectors[column], timestampConverter);
nulls[column] = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Optional<RecordCursor> createRecordCursor(
schema,
columns,
hiveStorageTimeZone,
session,
typeManager));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,22 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ProtectMode;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
Expand All @@ -86,20 +88,16 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Strings.padEnd;
import static com.google.common.io.BaseEncoding.base16;
Expand Down Expand Up @@ -154,7 +152,6 @@
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.writableTimestampObjectInspector;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getCharTypeInfo;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getVarcharTypeInfo;
import static org.joda.time.DateTimeZone.UTC;

public final class HiveWriteUtils
{
Expand Down Expand Up @@ -315,12 +312,10 @@ public static Object getField(Type type, Block block, int position)
return type.getSlice(block, position).getBytes();
}
if (DateType.DATE.equals(type)) {
long days = type.getLong(block, position);
return new Date(UTC.getMillisKeepLocal(DateTimeZone.getDefault(), TimeUnit.DAYS.toMillis(days)));
return Date.ofEpochDay(toIntExact(type.getLong(block, position)));
}
if (TimestampType.TIMESTAMP.equals(type)) {
long millisUtc = type.getLong(block, position);
return new Timestamp(millisUtc);
return Timestamp.ofEpochMilli(type.getLong(block, position));
}
if (type instanceof DecimalType) {
DecimalType decimalType = (DecimalType) type;
Expand Down Expand Up @@ -933,7 +928,7 @@ public void setField(Block block, int position)
private static class DateFieldSetter
extends FieldSetter
{
private final DateWritable value = new DateWritable();
private final DateWritableV2 value = new DateWritableV2();

public DateFieldSetter(SettableStructObjectInspector rowInspector, Object row, StructField field)
{
Expand All @@ -951,7 +946,7 @@ public void setField(Block block, int position)
private static class TimestampFieldSetter
extends FieldSetter
{
private final TimestampWritable value = new TimestampWritable();
private final TimestampWritableV2 value = new TimestampWritableV2();

public TimestampFieldSetter(SettableStructObjectInspector rowInspector, Object row, StructField field)
{
Expand All @@ -961,8 +956,7 @@ public TimestampFieldSetter(SettableStructObjectInspector rowInspector, Object r
@Override
public void setField(Block block, int position)
{
long millisUtc = TimestampType.TIMESTAMP.getLong(block, position);
value.setTime(millisUtc);
value.set(Timestamp.ofEpochMilli(TimestampType.TIMESTAMP.getLong(block, position)));
rowInspector.setStructFieldData(row, field, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.plugin.hive;

import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -51,9 +52,10 @@ public S3SelectRecordCursor(
Properties splitSchema,
List<HiveColumnHandle> columns,
DateTimeZone hiveStorageTimeZone,
ConnectorSession session,
TypeManager typeManager)
{
super(configuration, path, recordReader, totalBytes, updateSplitSchema(splitSchema, columns), columns, hiveStorageTimeZone, typeManager);
super(configuration, path, recordReader, totalBytes, updateSplitSchema(splitSchema, columns), columns, hiveStorageTimeZone, session, typeManager);
}

// since s3select only returns the required column, not the whole columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Optional<RecordCursor> createRecordCursor(
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager);
String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate);
S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, clientConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory);
return Optional.of(new S3SelectRecordCursor(configuration, path, recordReader, length, schema, columns, hiveStorageTimeZone, typeManager));
return Optional.of(new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, columns, hiveStorageTimeZone, session, typeManager));
}

// unsupported serdes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,8 @@ private PrivilegeBag buildPrivilegeBag(
new HiveObjectRef(TABLE, databaseName, tableName, null, null),
grantee.getName(),
fromPrestoPrincipalType(grantee.getType()),
privilegeGrantInfo));
privilegeGrantInfo,
"SQL"));
}
return new PrivilegeBag(privilegeBagBuilder.build());
}
Expand Down
Loading