Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum HiveErrorCode
HIVE_TABLE_LOCK_NOT_ACQUIRED(40, EXTERNAL),
HIVE_VIEW_TRANSLATION_ERROR(41, EXTERNAL),
HIVE_PARTITION_NOT_FOUND(42, USER_ERROR),
HIVE_INVALID_TIMESTAMP_COERCION(43, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.plugin.hive.coercions.IntegerNumberUpscaleCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.LongTimestampToVarcharCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.ShortTimestampToVarcharCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToLongTimestampCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToShortTimestampCoercer;
import io.trino.plugin.hive.coercions.VarcharCoercer;
import io.trino.plugin.hive.coercions.VarcharToIntegerNumberCoercer;
import io.trino.plugin.hive.type.Category;
Expand Down Expand Up @@ -314,6 +316,12 @@ public static Optional<Function<Block, Block>> createCoercer(TypeManager typeMan
if (fromType instanceof VarcharType fromVarcharType && (toHiveType.equals(HIVE_BYTE) || toHiveType.equals(HIVE_SHORT) || toHiveType.equals(HIVE_INT) || toHiveType.equals(HIVE_LONG))) {
return Optional.of(new VarcharToIntegerNumberCoercer<>(fromVarcharType, toType));
}
if (fromType instanceof VarcharType varcharType && toType instanceof TimestampType timestampType) {
if (timestampType.isShort()) {
return Optional.of(new VarcharToShortTimestampCoercer(varcharType, timestampType));
}
return Optional.of(new VarcharToLongTimestampCoercer(varcharType, timestampType));
}
if (fromType instanceof VarcharType fromVarcharType && toType instanceof VarcharType toVarcharType) {
if (narrowerThan(toVarcharType, fromVarcharType)) {
return Optional.of(new VarcharCoercer(fromVarcharType, toVarcharType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, tableBucketNumber, regularAndInterimColumnMappings);

for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager);
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager, getTimestampPrecision(session));

Optional<ReaderPageSource> readerWithProjections = pageSourceFactory.createPageSource(
configuration,
Expand Down Expand Up @@ -249,7 +249,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
}

for (HiveRecordCursorProvider provider : cursorProviders) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, false, typeManager);
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, false, typeManager, getTimestampPrecision(session));
Optional<ReaderRecordCursorWithProjections> readerWithProjections = provider.createRecordCursor(
configuration,
session,
Expand Down Expand Up @@ -544,7 +544,7 @@ public static List<ColumnMapping> extractRegularAndInterimColumnMappings(List<Co
.collect(toImmutableList());
}

public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regularColumnMappings, boolean doCoercion, TypeManager typeManager)
public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regularColumnMappings, boolean doCoercion, TypeManager typeManager, HiveTimestampPrecision timestampPrecision)
{
return regularColumnMappings.stream()
.map(columnMapping -> {
Expand All @@ -560,14 +560,14 @@ public static List<HiveColumnHandle> toColumnHandles(List<ColumnMapping> regular
projectedColumn.getDereferenceIndices(),
projectedColumn.getDereferenceNames(),
fromHiveType,
fromHiveType.getType(typeManager));
fromHiveType.getType(typeManager, timestampPrecision));
});

return new HiveColumnHandle(
columnHandle.getBaseColumnName(),
columnHandle.getBaseHiveColumnIndex(),
fromHiveTypeBase,
fromHiveTypeBase.getType(typeManager),
fromHiveTypeBase.getType(typeManager, timestampPrecision),
newColumnProjectionInfo,
columnHandle.getColumnType(),
columnHandle.getComment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.hive.coercions;

import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.LongTimestamp;
Expand All @@ -24,17 +26,23 @@
import java.time.chrono.IsoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;

import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_TIMESTAMP_COERCION;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.round;
import static io.trino.spi.type.Timestamps.roundDiv;
import static io.trino.spi.type.Varchars.truncateToLength;
import static java.lang.Math.floorDiv;
import static java.lang.Math.floorMod;
import static java.lang.Math.toIntExact;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.time.format.ResolverStyle.LENIENT;
import static org.joda.time.DateTimeConstants.SECONDS_PER_DAY;

public final class TimestampCoercer
{
Expand All @@ -44,8 +52,12 @@ public final class TimestampCoercer
.appendLiteral(' ')
.append(ISO_LOCAL_TIME)
.toFormatter()
.withResolverStyle(LENIENT)
.withChronology(IsoChronology.INSTANCE);

// Before 1900, Java Time and Joda Time are not consistent with java.sql.Date and java.util.Calendar
private static final long START_OF_MODERN_ERA_SECONDS = java.time.LocalDate.of(1900, 1, 1).toEpochDay() * SECONDS_PER_DAY;

private TimestampCoercer() {}

public static class ShortTimestampToVarcharCoercer
Expand All @@ -62,6 +74,9 @@ protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int pos
long epochMicros = fromType.getLong(block, position);
long epochSecond = floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
int nanoFraction = floorMod(epochMicros, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND;
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}
toType.writeSlice(
blockBuilder,
truncateToLength(
Expand All @@ -88,6 +103,9 @@ protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int pos
long microsFraction = floorMod(timestamp.getEpochMicros(), MICROSECONDS_PER_SECOND);
// Hive timestamp has nanoseconds precision, so no truncation here
long nanosFraction = (microsFraction * NANOSECONDS_PER_MICROSECOND) + (timestamp.getPicosOfMicro() / PICOSECONDS_PER_NANOSECOND);
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}

toType.writeSlice(
blockBuilder,
Expand All @@ -97,4 +115,63 @@ protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int pos
toType));
}
}

public static class VarcharToShortTimestampCoercer
extends TypeCoercer<VarcharType, TimestampType>
{
public VarcharToShortTimestampCoercer(VarcharType fromType, TimestampType toType)
{
super(fromType, toType);
}

@Override
protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int position)
{
try {
Slice value = fromType.getSlice(block, position);
LocalDateTime dateTime = LOCAL_DATE_TIME.parse(value.toStringUtf8(), LocalDateTime::from);
long epochSecond = dateTime.toEpochSecond(UTC);
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}
long epochMicros = epochSecond * MICROSECONDS_PER_SECOND + roundDiv(dateTime.getNano(), NANOSECONDS_PER_MICROSECOND);
toType.writeLong(blockBuilder, round(epochMicros, 6 - toType.getPrecision()));
}
catch (DateTimeParseException exception) {
// Hive treats invalid String as null instead of propagating exception
// In case of bigger tables with all values being invalid, log output will be huge so avoiding log here.
blockBuilder.appendNull();
}
}
}

public static class VarcharToLongTimestampCoercer
extends TypeCoercer<VarcharType, TimestampType>
{
public VarcharToLongTimestampCoercer(VarcharType fromType, TimestampType toType)
{
super(fromType, toType);
}

@Override
protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int position)
{
try {
Slice value = fromType.getSlice(block, position);
LocalDateTime dateTime = LOCAL_DATE_TIME.parse(value.toStringUtf8(), LocalDateTime::from);
long epochSecond = dateTime.toEpochSecond(UTC);
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}
long epochMicros = epochSecond * MICROSECONDS_PER_SECOND + dateTime.getNano() / NANOSECONDS_PER_MICROSECOND;
int picosOfMicro = (dateTime.getNano() % NANOSECONDS_PER_MICROSECOND) * PICOSECONDS_PER_NANOSECOND;
toType.writeObject(blockBuilder, new LongTimestamp(epochMicros, picosOfMicro));
}
catch (DateTimeParseException exception) {
// Hive treats invalid String as null instead of propagating exception
// In case of bigger tables with all values being invalid, log output will be huge so avoiding log here.
blockBuilder.appendNull();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,45 @@
import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.plugin.hive.coercions.TimestampCoercer.LongTimestampToVarcharCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.ShortTimestampToVarcharCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToLongTimestampCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToShortTimestampCoercer;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import java.util.Optional;

import static io.trino.orc.metadata.OrcType.OrcTypeKind.STRING;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.TIMESTAMP;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.VARCHAR;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;

public final class OrcTypeTranslator
{
private OrcTypeTranslator() {}

public static Optional<TypeCoercer<? extends Type, ? extends Type>> createCoercer(OrcTypeKind fromOrcType, Type toTrinoType, HiveTimestampPrecision timestampPrecision)
{
if (fromOrcType.equals(OrcTypeKind.TIMESTAMP) && toTrinoType instanceof VarcharType varcharType) {
if (fromOrcType == TIMESTAMP && toTrinoType instanceof VarcharType varcharType) {
TimestampType timestampType = createTimestampType(timestampPrecision.getPrecision());
if (timestampType.isShort()) {
return Optional.of(new ShortTimestampToVarcharCoercer(timestampType, varcharType));
}
return Optional.of(new LongTimestampToVarcharCoercer(timestampType, varcharType));
}
if (isVarcharType(fromOrcType) && toTrinoType instanceof TimestampType timestampType) {
if (timestampType.isShort()) {
return Optional.of(new VarcharToShortTimestampCoercer(createUnboundedVarcharType(), timestampType));
}
return Optional.of(new VarcharToLongTimestampCoercer(createUnboundedVarcharType(), timestampType));
}
return Optional.empty();
}

private static boolean isVarcharType(OrcTypeKind orcTypeKind)
{
return orcTypeKind == STRING || orcTypeKind == VARCHAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType, HiveTimest
toHiveType.equals(HIVE_BYTE) ||
toHiveType.equals(HIVE_SHORT) ||
toHiveType.equals(HIVE_INT) ||
toHiveType.equals(HIVE_LONG);
toHiveType.equals(HIVE_LONG) ||
toHiveType.equals(HIVE_TIMESTAMP);
}
if (fromType instanceof CharType) {
return toType instanceof CharType;
Expand Down
Loading