diff --git a/java/core/src/java/org/apache/orc/OrcUtils.java b/java/core/src/java/org/apache/orc/OrcUtils.java index 5811c59e98..85a4b831ea 100644 --- a/java/core/src/java/org/apache/orc/OrcUtils.java +++ b/java/core/src/java/org/apache/orc/OrcUtils.java @@ -157,6 +157,9 @@ private static void appendOrcTypes(List result, TypeDescription t case TIMESTAMP: type.setKind(OrcProto.Type.Kind.TIMESTAMP); break; + case TIMESTAMP_INSTANT: + type.setKind(OrcProto.Type.Kind.TIMESTAMP_INSTANT); + break; case DATE: type.setKind(OrcProto.Type.Kind.DATE); break; @@ -305,6 +308,9 @@ TypeDescription convertTypeFromProtobuf(List types, case TIMESTAMP: result = TypeDescription.createTimestamp(); break; + case TIMESTAMP_INSTANT: + result = TypeDescription.createTimestampInstant(); + break; case DATE: result = TypeDescription.createDate(); break; diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java index 9b49ff053c..cace19c40b 100644 --- a/java/core/src/java/org/apache/orc/TypeDescription.java +++ b/java/core/src/java/org/apache/orc/TypeDescription.java @@ -120,7 +120,8 @@ public enum Category { LIST("array", false), MAP("map", false), STRUCT("struct", false), - UNION("uniontype", false); + UNION("uniontype", false), + TIMESTAMP_INSTANT("timestamp with local time zone", false); Category(String name, boolean isPrimitive) { this.name = name; @@ -179,6 +180,10 @@ public static TypeDescription createTimestamp() { return new TypeDescription(Category.TIMESTAMP); } + public static TypeDescription createTimestampInstant() { + return new TypeDescription(Category.TIMESTAMP_INSTANT); + } + public static TypeDescription createBinary() { return new TypeDescription(Category.BINARY); } @@ -211,18 +216,31 @@ public String toString() { } static Category parseCategory(StringPosition source) { - int start = source.position; + StringBuilder word = new StringBuilder(); + boolean hadSpace = true; while (source.position < source.length) { char ch = source.value.charAt(source.position); - if (!Character.isLetter(ch)) { + if (Character.isLetter(ch)) { + word.append(Character.toLowerCase(ch)); + hadSpace = false; + } else if (ch == ' ') { + if (!hadSpace) { + hadSpace = true; + word.append(ch); + } + } else { break; } source.position += 1; } - if (source.position != start) { - String word = source.value.substring(start, source.position).toLowerCase(); + String catString = word.toString(); + // if there were trailing spaces, remove them. + if (hadSpace) { + catString = catString.trim(); + } + if (!catString.isEmpty()) { for (Category cat : Category.values()) { - if (cat.getName().equals(word)) { + if (cat.getName().equals(catString)) { return cat; } } @@ -349,6 +367,7 @@ static TypeDescription parseType(StringPosition source) { case SHORT: case STRING: case TIMESTAMP: + case TIMESTAMP_INSTANT: break; case CHAR: case VARCHAR: @@ -650,6 +669,7 @@ private ColumnVector createColumn(RowBatchVersion version, int maxSize) { case DATE: return new LongColumnVector(maxSize); case TIMESTAMP: + case TIMESTAMP_INSTANT: return new TimestampColumnVector(maxSize); case FLOAT: case DOUBLE: diff --git a/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java b/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java index 0714224e74..8653287a19 100644 --- a/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java +++ b/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java @@ -92,6 +92,7 @@ static void printType(PrintStream output, case SHORT: case STRING: case TIMESTAMP: + case TIMESTAMP_INSTANT: break; case DECIMAL: diff --git a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java index 579173727e..b73cafe4cf 100644 --- a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java @@ -1427,12 +1427,12 @@ public String toString() { StringBuilder buf = new StringBuilder(super.toString()); if (getNumberOfValues() != 0) { buf.append(" min: "); - buf.append(minimum); + buf.append(getMinimum()); buf.append(" max: "); - buf.append(maximum); + buf.append(getMaximum()); if (hasSum) { buf.append(" sum: "); - buf.append(sum); + buf.append(getSum()); } } return buf.toString(); @@ -1632,7 +1632,7 @@ public int hashCode() { } } - private static final class TimestampStatisticsImpl extends ColumnStatisticsImpl + private static class TimestampStatisticsImpl extends ColumnStatisticsImpl implements TimestampColumnStatistics { private Long minimum = null; private Long maximum = null; @@ -1791,6 +1791,30 @@ public int hashCode() { } } + private static final class TimestampInstantStatisticsImpl extends TimestampStatisticsImpl { + TimestampInstantStatisticsImpl() { + } + + TimestampInstantStatisticsImpl(OrcProto.ColumnStatistics stats) { + super(stats); + } + + @Override + public void updateTimestamp(Timestamp value) { + updateTimestamp(value.getTime()); + } + + @Override + public Timestamp getMinimum() { + return getMinimumUTC(); + } + + @Override + public Timestamp getMaximum() { + return getMaximumUTC(); + } + } + protected long count = 0; private boolean hasNull = false; private long bytesOnDisk = 0; @@ -1973,6 +1997,8 @@ public static ColumnStatisticsImpl create(TypeDescription schema) { return new DateStatisticsImpl(); case TIMESTAMP: return new TimestampStatisticsImpl(); + case TIMESTAMP_INSTANT: + return new TimestampInstantStatisticsImpl(); case BINARY: return new BinaryStatisticsImpl(); default: @@ -2002,7 +2028,10 @@ public static ColumnStatisticsImpl deserialize(TypeDescription schema, } else if (stats.hasDateStatistics()) { return new DateStatisticsImpl(stats); } else if (stats.hasTimestampStatistics()) { - return new TimestampStatisticsImpl(stats); + return schema == null || + schema.getCategory() == TypeDescription.Category.TIMESTAMP ? + new TimestampStatisticsImpl(stats) : + new TimestampInstantStatisticsImpl(stats); } else if(stats.hasBinaryStatistics()) { return new BinaryStatisticsImpl(stats); } else { diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java index bd2e24c57e..a6c158b470 100644 --- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,9 +20,14 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.sql.Date; -import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; import java.util.EnumMap; -import java.util.Map; +import java.util.TimeZone; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -37,7 +42,6 @@ import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.orc.OrcProto; -import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.apache.orc.TypeDescription.Category; import org.apache.orc.impl.reader.StripePlanner; @@ -52,10 +56,11 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory { */ public static class ConvertTreeReader extends TreeReader { - private TreeReader convertTreeReader; + TreeReader fromReader; - ConvertTreeReader(int columnId) throws IOException { + ConvertTreeReader(int columnId, TreeReader fromReader) throws IOException { super(columnId, null); + this.fromReader = fromReader; } // The ordering of types here is used to determine which numeric types @@ -80,11 +85,7 @@ private static void registerNumericType(TypeDescription.Category kind, int level numericTypes.put(kind, level); } - protected void setConvertTreeReader(TreeReader convertTreeReader) { - this.convertTreeReader = convertTreeReader; - } - - protected TreeReader getStringGroupTreeReader(int columnId, + static TreeReader getStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { switch (fileType.getCategory()) { case STRING: @@ -226,44 +227,6 @@ protected HiveDecimal parseDecimalFromString(String string) { } } - /** - * @param string - * @return the Timestamp parsed, or null if there was a parse error. - */ - protected Timestamp parseTimestampFromString(String string) { - try { - Timestamp value = Timestamp.valueOf(string); - return value; - } catch (IllegalArgumentException e) { - return null; - } - } - - /** - * @param string - * @return the Date parsed, or null if there was a parse error. - */ - protected Date parseDateFromString(String string) { - try { - Date value = Date.valueOf(string); - return value; - } catch (IllegalArgumentException e) { - return null; - } - } - - protected String stringFromBytesColumnVectorEntry( - BytesColumnVector bytesColVector, int elementNum) { - String string; - - string = new String( - bytesColVector.vector[elementNum], - bytesColVector.start[elementNum], bytesColVector.length[elementNum], - StandardCharsets.UTF_8); - - return string; - } - private static final double MIN_LONG_AS_DOUBLE = -0x1p63; /* * We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store @@ -283,31 +246,31 @@ public boolean doubleCanFitInLong(double doubleValue) { @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { // Pass-thru. - convertTreeReader.checkEncoding(encoding); + fromReader.checkEncoding(encoding); } @Override void startStripe(StripePlanner planner) throws IOException { // Pass-thru. - convertTreeReader.startStripe(planner); + fromReader.startStripe(planner); } @Override public void seek(PositionProvider[] index) throws IOException { // Pass-thru. - convertTreeReader.seek(index); + fromReader.seek(index); } @Override public void seek(PositionProvider index) throws IOException { // Pass-thru. - convertTreeReader.seek(index); + fromReader.seek(index); } @Override void skipRows(long items) throws IOException { // Pass-thru. - convertTreeReader.skipRows(items); + fromReader.skipRows(items); } /** @@ -399,66 +362,33 @@ protected boolean integerDownCastNeeded(TypeDescription fileType, TypeDescriptio } } - public static class AnyIntegerTreeReader extends ConvertTreeReader { - - private TypeDescription.Category fileTypeCategory; - private TreeReader anyIntegerTreeReader; - - AnyIntegerTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); - this.fileTypeCategory = fileType.getCategory(); - switch (fileTypeCategory) { - case BOOLEAN: - anyIntegerTreeReader = new BooleanTreeReader(columnId); - break; - case BYTE: - anyIntegerTreeReader = new ByteTreeReader(columnId); - break; - case SHORT: - anyIntegerTreeReader = new ShortTreeReader(columnId, context); - break; - case INT: - anyIntegerTreeReader = new IntTreeReader(columnId, context); - break; - case LONG: - anyIntegerTreeReader = new LongTreeReader(columnId, context); - break; - default: - throw new RuntimeException("Unexpected type kind " + fileType.getCategory().name()); - } - setConvertTreeReader(anyIntegerTreeReader); - } - - protected String getString(long longValue) { - if (fileTypeCategory == TypeDescription.Category.BOOLEAN) { - return longValue == 0 ? "FALSE" : "TRUE"; - } else { - return Long.toString(longValue); - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - anyIntegerTreeReader.nextVector(previousVector, isNull, batchSize); + private static TreeReader createFromInteger(int columnId, + TypeDescription fileType, + Context context) throws IOException { + switch (fileType.getCategory()) { + case BOOLEAN: + return new BooleanTreeReader(columnId); + case BYTE: + return new ByteTreeReader(columnId); + case SHORT: + return new ShortTreeReader(columnId, context); + case INT: + return new IntTreeReader(columnId, context); + case LONG: + return new LongTreeReader(columnId, context); + default: + throw new RuntimeException("Unexpected type kind " + fileType); } } public static class AnyIntegerFromAnyIntegerTreeReader extends ConvertTreeReader { - - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; - private final TypeDescription readerType; private final boolean downCastNeeded; AnyIntegerFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, createFromInteger(columnId, fileType, context)); this.readerType = readerType; - anyIntegerAsLongTreeReader = new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); downCastNeeded = integerDownCastNeeded(fileType, readerType); } @@ -466,14 +396,12 @@ public static class AnyIntegerFromAnyIntegerTreeReader extends ConvertTreeReader public void nextVector(ColumnVector previousVector, boolean[] isNull, final int batchSize) throws IOException { - anyIntegerAsLongTreeReader.nextVector(previousVector, isNull, batchSize); + fromReader.nextVector(previousVector, isNull, batchSize); LongColumnVector resultColVector = (LongColumnVector) previousVector; if (downCastNeeded) { if (resultColVector.isRepeating) { if (resultColVector.noNulls || !resultColVector.isNull[0]) { downCastAnyInteger(resultColVector, 0, readerType); - } else { - // Result remains null. } } else if (resultColVector.noNulls){ for (int i = 0; i < batchSize; i++) { @@ -483,8 +411,6 @@ public void nextVector(ColumnVector previousVector, for (int i = 0; i < batchSize; i++) { if (!resultColVector.isNull[i]) { downCastAnyInteger(resultColVector, i, readerType); - } else { - // Result remains null. } } } @@ -492,65 +418,18 @@ public void nextVector(ColumnVector previousVector, } } - public static class AnyIntegerFromFloatTreeReader extends ConvertTreeReader { - - private FloatTreeReader floatTreeReader; - - private final TypeDescription readerType; - private DoubleColumnVector doubleColVector; - private LongColumnVector longColVector; - - AnyIntegerFromFloatTreeReader(int columnId, TypeDescription readerType) - throws IOException { - super(columnId); - this.readerType = readerType; - floatTreeReader = new FloatTreeReader(columnId); - setConvertTreeReader(floatTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - double doubleValue = doubleColVector.vector[elementNum]; - if (!doubleCanFitInLong(doubleValue)) { - longColVector.isNull[elementNum] = true; - longColVector.noNulls = false; - } else { - // UNDONE: Does the overflow check above using double really work here for float? - float floatValue = (float) doubleValue; - downCastAnyInteger(longColVector, elementNum, (long) floatValue, readerType); - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (doubleColVector == null) { - // Allocate column vector for file; cast column vector for reader. - doubleColVector = new DoubleColumnVector(); - longColVector = (LongColumnVector) previousVector; - } - // Read present/isNull stream - floatTreeReader.nextVector(doubleColVector, isNull, batchSize); - - convertVector(doubleColVector, longColVector, batchSize); - } - } - public static class AnyIntegerFromDoubleTreeReader extends ConvertTreeReader { - - private DoubleTreeReader doubleTreeReader; - private final TypeDescription readerType; private DoubleColumnVector doubleColVector; private LongColumnVector longColVector; - AnyIntegerFromDoubleTreeReader(int columnId, TypeDescription readerType) + AnyIntegerFromDoubleTreeReader(int columnId, TypeDescription fileType, + TypeDescription readerType) throws IOException { - super(columnId); + super(columnId, fileType.getCategory() == Category.DOUBLE ? + new DoubleTreeReader(columnId) : + new FloatTreeReader(columnId)); this.readerType = readerType; - doubleTreeReader = new DoubleTreeReader(columnId); - setConvertTreeReader(doubleTreeReader); } @Override @@ -574,16 +453,13 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - doubleTreeReader.nextVector(doubleColVector, isNull, batchSize); + fromReader.nextVector(doubleColVector, isNull, batchSize); convertVector(doubleColVector, longColVector, batchSize); } } public static class AnyIntegerFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - private final int precision; private final int scale; private final TypeDescription readerType; @@ -592,12 +468,11 @@ public static class AnyIntegerFromDecimalTreeReader extends ConvertTreeReader { AnyIntegerFromDecimalTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(), + fileType.getScale(), context)); this.precision = fileType.getPrecision(); this.scale = fileType.getScale(); this.readerType = readerType; - decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context); - setConvertTreeReader(decimalTreeReader); } @Override @@ -662,31 +537,26 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - decimalTreeReader.nextVector(decimalColVector, isNull, batchSize); + fromReader.nextVector(decimalColVector, isNull, batchSize); convertVector(decimalColVector, longColVector, batchSize); } } public static class AnyIntegerFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private final TypeDescription readerType; private BytesColumnVector bytesColVector; private LongColumnVector longColVector; AnyIntegerFromStringGroupTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); this.readerType = readerType; - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); } @Override public void setConvertVectorElement(int elementNum) throws IOException { - String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum); + String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum); long longValue = parseLongFromString(string); if (!getIsParseError()) { downCastAnyInteger(longColVector, elementNum, longValue, readerType); @@ -706,30 +576,26 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); + fromReader.nextVector(bytesColVector, isNull, batchSize); convertVector(bytesColVector, longColVector, batchSize); } } public static class AnyIntegerFromTimestampTreeReader extends ConvertTreeReader { - - private TimestampTreeReader timestampTreeReader; - private final TypeDescription readerType; private TimestampColumnVector timestampColVector; private LongColumnVector longColVector; AnyIntegerFromTimestampTreeReader(int columnId, TypeDescription readerType, - Context context) throws IOException { - super(columnId); + Context context, + boolean instantType) throws IOException { + super(columnId, new TimestampTreeReader(columnId, context, instantType)); this.readerType = readerType; - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); } @Override - public void setConvertVectorElement(int elementNum) throws IOException { + public void setConvertVectorElement(int elementNum) { // Use TimestampWritable's getSeconds. long longValue = TimestampUtils.millisToSeconds( timestampColVector.asScratchTimestamp(elementNum).getTime()); @@ -746,226 +612,19 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); + fromReader.nextVector(timestampColVector, isNull, batchSize); convertVector(timestampColVector, longColVector, batchSize); } } - public static class FloatFromAnyIntegerTreeReader extends ConvertTreeReader { - - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; - - private LongColumnVector longColVector; - private DoubleColumnVector doubleColVector; - - FloatFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); - anyIntegerAsLongTreeReader = - new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - float floatValue = (float) longColVector.vector[elementNum]; - if (!Float.isNaN(floatValue)) { - doubleColVector.vector[elementNum] = floatValue; - } else { - doubleColVector.vector[elementNum] = Double.NaN; - doubleColVector.noNulls = false; - doubleColVector.isNull[elementNum] = true; - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (longColVector == null) { - // Allocate column vector for file; cast column vector for reader. - longColVector = new LongColumnVector(); - doubleColVector = (DoubleColumnVector) previousVector; - } - // Read present/isNull stream - anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize); - - convertVector(longColVector, doubleColVector, batchSize); - } - } - - public static class FloatFromDoubleTreeReader extends ConvertTreeReader { - - private DoubleTreeReader doubleTreeReader; - - FloatFromDoubleTreeReader(int columnId) throws IOException { - super(columnId); - doubleTreeReader = new DoubleTreeReader(columnId); - setConvertTreeReader(doubleTreeReader); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - doubleTreeReader.nextVector(previousVector, isNull, batchSize); - - DoubleColumnVector resultColVector = (DoubleColumnVector) previousVector; - double[] resultVector = resultColVector.vector; - if (resultColVector.isRepeating) { - if (resultColVector.noNulls || !resultColVector.isNull[0]) { - resultVector[0] = (float) resultVector[0]; - } else { - // Remains null. - } - } else if (resultColVector.noNulls){ - for (int i = 0; i < batchSize; i++) { - resultVector[i] = (float) resultVector[i]; - } - } else { - for (int i = 0; i < batchSize; i++) { - if (!resultColVector.isNull[i]) { - resultVector[i] = (float) resultVector[i]; - } else { - // Remains null. - } - } - } - } - } - - public static class FloatFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - - private final int precision; - private final int scale; - private DecimalColumnVector decimalColVector; - private DoubleColumnVector doubleColVector; - - FloatFromDecimalTreeReader(int columnId, TypeDescription fileType, - TypeDescription readerType, Context context) throws IOException { - super(columnId); - this.precision = fileType.getPrecision(); - this.scale = fileType.getScale(); - decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context); - setConvertTreeReader(decimalTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - doubleColVector.vector[elementNum] = - (float) decimalColVector.vector[elementNum].doubleValue(); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (decimalColVector == null) { - // Allocate column vector for file; cast column vector for reader. - decimalColVector = new DecimalColumnVector(precision, scale); - doubleColVector = (DoubleColumnVector) previousVector; - } - // Read present/isNull stream - decimalTreeReader.nextVector(decimalColVector, isNull, batchSize); - - convertVector(decimalColVector, doubleColVector, batchSize); - } - } - - public static class FloatFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - - private BytesColumnVector bytesColVector; - private DoubleColumnVector doubleColVector; - - FloatFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) - throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum); - float floatValue = parseFloatFromString(string); - if (!getIsParseError()) { - doubleColVector.vector[elementNum] = floatValue; - } else { - doubleColVector.vector[elementNum] = Double.NaN; - doubleColVector.noNulls = false; - doubleColVector.isNull[elementNum] = true; - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (bytesColVector == null) { - // Allocate column vector for file; cast column vector for reader. - bytesColVector = new BytesColumnVector(); - doubleColVector = (DoubleColumnVector) previousVector; - } - // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); - - convertVector(bytesColVector, doubleColVector, batchSize); - } - } - - public static class FloatFromTimestampTreeReader extends ConvertTreeReader { - - private TimestampTreeReader timestampTreeReader; - - private TimestampColumnVector timestampColVector; - private DoubleColumnVector doubleColVector; - - FloatFromTimestampTreeReader(int columnId, Context context) throws IOException { - super(columnId); - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - doubleColVector.vector[elementNum] = (float) TimestampUtils.getDouble( - timestampColVector.asScratchTimestamp(elementNum)); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (timestampColVector == null) { - // Allocate column vector for file; cast column vector for reader. - timestampColVector = new TimestampColumnVector(); - doubleColVector = (DoubleColumnVector) previousVector; - } - // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); - - convertVector(timestampColVector, doubleColVector, batchSize); - } - } - public static class DoubleFromAnyIntegerTreeReader extends ConvertTreeReader { - - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; - private LongColumnVector longColVector; private DoubleColumnVector doubleColVector; DoubleFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { - super(columnId); - anyIntegerAsLongTreeReader = - new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); + super(columnId, createFromInteger(columnId, fileType, context)); } @Override @@ -991,72 +650,23 @@ public void nextVector(ColumnVector previousVector, doubleColVector = (DoubleColumnVector) previousVector; } // Read present/isNull stream - anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); convertVector(longColVector, doubleColVector, batchSize); } } - public static class DoubleFromFloatTreeReader extends ConvertTreeReader { - - private FloatTreeReader floatTreeReader; - - DoubleFromFloatTreeReader(int columnId) throws IOException { - super(columnId); - floatTreeReader = new FloatTreeReader(columnId); - setConvertTreeReader(floatTreeReader); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - // we get the DoubleColumnVector produced by float tree reader first, then iterate through - // the elements and make double -> float -> string -> double conversion to preserve the - // precision. When float tree reader reads float and assign it to double, java's widening - // conversion adds more precision which will break all comparisons. - // Example: float f = 74.72 - // double d = f ---> 74.72000122070312 - // Double.parseDouble(String.valueOf(f)) ---> 74.72 - floatTreeReader.nextVector(previousVector, isNull, batchSize); - - DoubleColumnVector doubleColumnVector = (DoubleColumnVector) previousVector; - if (doubleColumnVector.isRepeating) { - if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) { - final float f = (float) doubleColumnVector.vector[0]; - doubleColumnVector.vector[0] = Double.parseDouble(String.valueOf(f)); - } - } else if (doubleColumnVector.noNulls){ - for (int i = 0; i < batchSize; i++) { - final float f = (float) doubleColumnVector.vector[i]; - doubleColumnVector.vector[i] = Double.parseDouble(String.valueOf(f)); - } - } else { - for (int i = 0; i < batchSize; i++) { - if (!doubleColumnVector.isNull[i]) { - final float f = (float) doubleColumnVector.vector[i]; - doubleColumnVector.vector[i] = Double.parseDouble(String.valueOf(f)); - } - } - } - } - } - public static class DoubleFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - private final int precision; private final int scale; private DecimalColumnVector decimalColVector; private DoubleColumnVector doubleColVector; DoubleFromDecimalTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { - super(columnId); + super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(), + fileType.getScale(), context)); this.precision = fileType.getPrecision(); this.scale = fileType.getScale(); - decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context); - setConvertTreeReader(decimalTreeReader); } @Override @@ -1075,29 +685,24 @@ public void nextVector(ColumnVector previousVector, doubleColVector = (DoubleColumnVector) previousVector; } // Read present/isNull stream - decimalTreeReader.nextVector(decimalColVector, isNull, batchSize); + fromReader.nextVector(decimalColVector, isNull, batchSize); convertVector(decimalColVector, doubleColVector, batchSize); } } public static class DoubleFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private BytesColumnVector bytesColVector; private DoubleColumnVector doubleColVector; DoubleFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); } @Override public void setConvertVectorElement(int elementNum) throws IOException { - String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum); + String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum); double doubleValue = parseDoubleFromString(string); if (!getIsParseError()) { doubleColVector.vector[elementNum] = doubleValue; @@ -1117,23 +722,19 @@ public void nextVector(ColumnVector previousVector, doubleColVector = (DoubleColumnVector) previousVector; } // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); + fromReader.nextVector(bytesColVector, isNull, batchSize); convertVector(bytesColVector, doubleColVector, batchSize); } } public static class DoubleFromTimestampTreeReader extends ConvertTreeReader { - - private TimestampTreeReader timestampTreeReader; - private TimestampColumnVector timestampColVector; private DoubleColumnVector doubleColVector; - DoubleFromTimestampTreeReader(int columnId, Context context) throws IOException { - super(columnId); - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); + DoubleFromTimestampTreeReader(int columnId, Context context, + boolean instantType) throws IOException { + super(columnId, new TimestampTreeReader(columnId, context, instantType)); } @Override @@ -1152,25 +753,41 @@ public void nextVector(ColumnVector previousVector, doubleColVector = (DoubleColumnVector) previousVector; } // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); + fromReader.nextVector(timestampColVector, isNull, batchSize); convertVector(timestampColVector, doubleColVector, batchSize); } } - public static class DecimalFromAnyIntegerTreeReader extends ConvertTreeReader { + public static class FloatFromDoubleTreeReader extends ConvertTreeReader { + FloatFromDoubleTreeReader(int columnId, Context context) throws IOException { + super(columnId, new DoubleTreeReader(columnId)); + } - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; + @Override + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + final int batchSize) throws IOException { + // Read present/isNull stream + fromReader.nextVector(previousVector, isNull, batchSize); + DoubleColumnVector vector = (DoubleColumnVector) previousVector; + if (previousVector.isRepeating) { + vector.vector[0] = (float) vector.vector[0]; + } else { + for(int i=0; i < batchSize; ++i) { + vector.vector[i] = (float) vector.vector[i]; + } + } + } + } + public static class DecimalFromAnyIntegerTreeReader extends ConvertTreeReader { private LongColumnVector longColVector; private ColumnVector decimalColVector; DecimalFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { - super(columnId); - anyIntegerAsLongTreeReader = - new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); + super(columnId, createFromInteger(columnId, fileType, context)); } @Override @@ -1195,77 +812,22 @@ public void nextVector(ColumnVector previousVector, decimalColVector = previousVector; } // Read present/isNull stream - anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); convertVector(longColVector, decimalColVector, batchSize); } } - public static class DecimalFromFloatTreeReader extends ConvertTreeReader { - - private FloatTreeReader floatTreeReader; - - private DoubleColumnVector doubleColVector; - private ColumnVector decimalColVector; - - DecimalFromFloatTreeReader(int columnId, TypeDescription readerType) - throws IOException { - super(columnId); - floatTreeReader = new FloatTreeReader(columnId); - setConvertTreeReader(floatTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) throws IOException { - float floatValue = (float) doubleColVector.vector[elementNum]; - if (!Float.isNaN(floatValue)) { - HiveDecimal decimalValue = - HiveDecimal.create(Float.toString(floatValue)); - if (decimalValue != null) { - // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds. - if (decimalColVector instanceof Decimal64ColumnVector) { - ((Decimal64ColumnVector) decimalColVector).set(elementNum, decimalValue); - } else { - ((DecimalColumnVector) decimalColVector).set(elementNum, decimalValue); - } - } else { - decimalColVector.noNulls = false; - decimalColVector.isNull[elementNum] = true; - } - } else { - decimalColVector.noNulls = false; - decimalColVector.isNull[elementNum] = true; - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (doubleColVector == null) { - // Allocate column vector for file; cast column vector for reader. - doubleColVector = new DoubleColumnVector(); - decimalColVector = previousVector; - } - // Read present/isNull stream - floatTreeReader.nextVector(doubleColVector, isNull, batchSize); - - convertVector(doubleColVector, decimalColVector, batchSize); - } - } - public static class DecimalFromDoubleTreeReader extends ConvertTreeReader { - - private DoubleTreeReader doubleTreeReader; - private DoubleColumnVector doubleColVector; private ColumnVector decimalColVector; - DecimalFromDoubleTreeReader(int columnId, TypeDescription readerType) + DecimalFromDoubleTreeReader(int columnId, TypeDescription fileType, + TypeDescription readerType) throws IOException { - super(columnId); - doubleTreeReader = new DoubleTreeReader(columnId); - setConvertTreeReader(doubleTreeReader); + super(columnId, fileType.getCategory() == Category.DOUBLE ? + new DoubleTreeReader(columnId) : + new FloatTreeReader(columnId)); } @Override @@ -1294,29 +856,24 @@ public void nextVector(ColumnVector previousVector, decimalColVector = previousVector; } // Read present/isNull stream - doubleTreeReader.nextVector(doubleColVector, isNull, batchSize); + fromReader.nextVector(doubleColVector, isNull, batchSize); convertVector(doubleColVector, decimalColVector, batchSize); } } public static class DecimalFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private BytesColumnVector bytesColVector; private ColumnVector decimalColVector; DecimalFromStringGroupTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); } @Override public void setConvertVectorElement(int elementNum) throws IOException { - String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum); + String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum); HiveDecimal value = parseDecimalFromString(string); if (value != null) { // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds. @@ -1341,30 +898,26 @@ public void nextVector(ColumnVector previousVector, decimalColVector = previousVector; } // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); + fromReader.nextVector(bytesColVector, isNull, batchSize); convertVector(bytesColVector, decimalColVector, batchSize); } } public static class DecimalFromTimestampTreeReader extends ConvertTreeReader { - - private TimestampTreeReader timestampTreeReader; - private TimestampColumnVector timestampColVector; private ColumnVector decimalColVector; - DecimalFromTimestampTreeReader(int columnId, Context context) throws IOException { - super(columnId); - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); + DecimalFromTimestampTreeReader(int columnId, Context context, + boolean instantType) throws IOException { + super(columnId, new TimestampTreeReader(columnId, context, instantType)); } @Override public void setConvertVectorElement(int elementNum) throws IOException { - double doubleValue = TimestampUtils.getDouble( - timestampColVector.asScratchTimestamp(elementNum)); - HiveDecimal value = HiveDecimal.create(Double.toString(doubleValue)); + long seconds = timestampColVector.time[elementNum] / 1000; + long nanos = timestampColVector.nanos[elementNum]; + HiveDecimal value = HiveDecimal.create(String.format("%d.%09d", seconds, nanos)); if (value != null) { // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds. if (decimalColVector instanceof Decimal64ColumnVector) { @@ -1372,9 +925,6 @@ public void setConvertVectorElement(int elementNum) throws IOException { } else { ((DecimalColumnVector) decimalColVector).set(elementNum, value); } - } else { - decimalColVector.noNulls = false; - decimalColVector.isNull[elementNum] = true; } } @@ -1388,16 +938,13 @@ public void nextVector(ColumnVector previousVector, decimalColVector = previousVector; } // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); + fromReader.nextVector(timestampColVector, isNull, batchSize); convertVector(timestampColVector, decimalColVector, batchSize); } } public static class DecimalFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - private DecimalColumnVector fileDecimalColVector; private int filePrecision; private int fileScale; @@ -1405,11 +952,10 @@ public static class DecimalFromDecimalTreeReader extends ConvertTreeReader { DecimalFromDecimalTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(), + fileType.getScale(), context)); filePrecision = fileType.getPrecision(); fileScale = fileType.getScale(); - decimalTreeReader = new DecimalTreeReader(columnId, filePrecision, fileScale, context); - setConvertTreeReader(decimalTreeReader); } @Override @@ -1433,34 +979,27 @@ public void nextVector(ColumnVector previousVector, decimalColVector = previousVector; } // Read present/isNull stream - decimalTreeReader.nextVector(fileDecimalColVector, isNull, batchSize); + fromReader.nextVector(fileDecimalColVector, isNull, batchSize); convertVector(fileDecimalColVector, decimalColVector, batchSize); } } public static class StringGroupFromAnyIntegerTreeReader extends ConvertTreeReader { - - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; - - private final TypeDescription readerType; - private LongColumnVector longColVector; - private BytesColumnVector bytesColVector; + protected final TypeDescription readerType; + protected LongColumnVector longColVector; + protected BytesColumnVector bytesColVector; StringGroupFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, createFromInteger(columnId, fileType, context)); this.readerType = readerType; - anyIntegerAsLongTreeReader = - new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); } @Override public void setConvertVectorElement(int elementNum) { - long longValue = longColVector.vector[elementNum]; - String string = anyIntegerAsLongTreeReader.getString(longValue); - byte[] bytes = string.getBytes(StandardCharsets.UTF_8); + byte[] bytes = Long.toString(longColVector.vector[elementNum]) + .getBytes(StandardCharsets.UTF_8); assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes); } @@ -1470,76 +1009,44 @@ public void nextVector(ColumnVector previousVector, final int batchSize) throws IOException { if (longColVector == null) { // Allocate column vector for file; cast column vector for reader. - longColVector = new LongColumnVector(); - bytesColVector = (BytesColumnVector) previousVector; - } - // Read present/isNull stream - anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize); - - convertVector(longColVector, bytesColVector, batchSize); - } - } - - public static class StringGroupFromFloatTreeReader extends ConvertTreeReader { - - private FloatTreeReader floatTreeReader; - - private final TypeDescription readerType; - private DoubleColumnVector doubleColVector; - private BytesColumnVector bytesColVector; - - - StringGroupFromFloatTreeReader(int columnId, TypeDescription readerType, - Context context) throws IOException { - super(columnId); - this.readerType = readerType; - floatTreeReader = new FloatTreeReader(columnId); - setConvertTreeReader(floatTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) { - float floatValue = (float) doubleColVector.vector[elementNum]; - if (!Float.isNaN(floatValue)) { - String string = String.valueOf(floatValue); - byte[] bytes = string.getBytes(StandardCharsets.UTF_8); - assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes); - } else { - bytesColVector.noNulls = false; - bytesColVector.isNull[elementNum] = true; - } - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (doubleColVector == null) { - // Allocate column vector for file; cast column vector for reader. - doubleColVector = new DoubleColumnVector(); + longColVector = new LongColumnVector(); bytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - floatTreeReader.nextVector(doubleColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); - convertVector(doubleColVector, bytesColVector, batchSize); + convertVector(longColVector, bytesColVector, batchSize); } } - public static class StringGroupFromDoubleTreeReader extends ConvertTreeReader { + public static class StringGroupFromBooleanTreeReader extends StringGroupFromAnyIntegerTreeReader { + + StringGroupFromBooleanTreeReader(int columnId, TypeDescription fileType, + TypeDescription readerType, + Context context) throws IOException { + super(columnId, fileType, readerType, context); + } - private DoubleTreeReader doubleTreeReader; + @Override + public void setConvertVectorElement(int elementNum) { + byte[] bytes = (longColVector.vector[elementNum] != 0 ? "TRUE" : "FALSE") + .getBytes(StandardCharsets.UTF_8); + assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes); + } + } + public static class StringGroupFromDoubleTreeReader extends ConvertTreeReader { private final TypeDescription readerType; private DoubleColumnVector doubleColVector; private BytesColumnVector bytesColVector; - StringGroupFromDoubleTreeReader(int columnId, TypeDescription readerType, + StringGroupFromDoubleTreeReader(int columnId, TypeDescription fileType, + TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, fileType.getCategory() == Category.DOUBLE ? + new DoubleTreeReader(columnId) : + new FloatTreeReader(columnId)); this.readerType = readerType; - doubleTreeReader = new DoubleTreeReader(columnId); - setConvertTreeReader(doubleTreeReader); } @Override @@ -1565,7 +1072,7 @@ public void nextVector(ColumnVector previousVector, bytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - doubleTreeReader.nextVector(doubleColVector, isNull, batchSize); + fromReader.nextVector(doubleColVector, isNull, batchSize); convertVector(doubleColVector, bytesColVector, batchSize); } @@ -1574,9 +1081,6 @@ public void nextVector(ColumnVector previousVector, public static class StringGroupFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - private int precision; private int scale; private final TypeDescription readerType; @@ -1586,12 +1090,11 @@ public static class StringGroupFromDecimalTreeReader extends ConvertTreeReader { StringGroupFromDecimalTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(), + fileType.getScale(), context)); this.precision = fileType.getPrecision(); this.scale = fileType.getScale(); this.readerType = readerType; - decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context); - setConvertTreeReader(decimalTreeReader); scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; } @@ -1617,32 +1120,90 @@ public void nextVector(ColumnVector previousVector, bytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - decimalTreeReader.nextVector(decimalColVector, isNull, batchSize); + fromReader.nextVector(decimalColVector, isNull, batchSize); convertVector(decimalColVector, bytesColVector, batchSize); } } - public static class StringGroupFromTimestampTreeReader extends ConvertTreeReader { + /** + * The format for converting from/to string/date. + * Eg. "2019-07-09" + */ + static final DateTimeFormatter DATE_FORMAT = + new DateTimeFormatterBuilder().appendPattern("uuuu-MM-dd") + .toFormatter(); + + /** + * The format for converting from/to string/timestamp. + * Eg. "2019-07-09 13:11:00" + */ + static final DateTimeFormatter TIMESTAMP_FORMAT = + new DateTimeFormatterBuilder() + .append(DATE_FORMAT) + .appendPattern(" HH:mm:ss[.S]") + .toFormatter(); + + /** + * The format for converting from/to string/timestamp with local time zone. + * Eg. "2019-07-09 13:11:00 America/Los_Angeles" + */ + static final DateTimeFormatter INSTANT_TIMESTAMP_FORMAT = + new DateTimeFormatterBuilder() + .append(TIMESTAMP_FORMAT) + .appendPattern(" VV") + .toFormatter(); - private TimestampTreeReader timestampTreeReader; + /** + * Create an Instant from an entry in a TimestampColumnVector. + * It assumes that vector.isRepeating and null values have been handled + * before we get called. + * @param vector the timestamp column vector + * @param element the element number + * @return a timestamp Instant + */ + static Instant timestampToInstant(TimestampColumnVector vector, int element) { + return Instant.ofEpochSecond(vector.time[element] / 1000, + vector.nanos[element]); + } + + /** + * Convert a decimal to an Instant using seconds & nanos. + * @param vector the decimal64 column vector + * @param element the element number to use + * @return the timestamp instant + */ + static Instant decimalToInstant(DecimalColumnVector vector, int element) { + // copy the value so that we can mutate it + HiveDecimalWritable value = new HiveDecimalWritable(vector.vector[element]); + long seconds = value.longValue(); + value.mutateFractionPortion(); + value.mutateScaleByPowerOfTen(9); + int nanos = (int) value.longValue(); + return Instant.ofEpochSecond(seconds, nanos); + } + public static class StringGroupFromTimestampTreeReader extends ConvertTreeReader { private final TypeDescription readerType; + private final ZoneId local; + private final DateTimeFormatter formatter; private TimestampColumnVector timestampColVector; private BytesColumnVector bytesColVector; StringGroupFromTimestampTreeReader(int columnId, TypeDescription readerType, - Context context) throws IOException { - super(columnId); + Context context, + boolean instantType) throws IOException { + super(columnId, new TimestampTreeReader(columnId, context, instantType)); this.readerType = readerType; - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); + local = context.getUseUTCTimestamp() ? ZoneId.of("UTC") + : ZoneId.systemDefault(); + formatter = instantType ? INSTANT_TIMESTAMP_FORMAT : TIMESTAMP_FORMAT; } @Override public void setConvertVectorElement(int elementNum) throws IOException { - String string = - timestampColVector.asScratchTimestamp(elementNum).toString(); + String string = timestampToInstant(timestampColVector, elementNum).atZone(local) + .format(formatter); byte[] bytes = string.getBytes(StandardCharsets.UTF_8); assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes); } @@ -1657,16 +1218,13 @@ public void nextVector(ColumnVector previousVector, bytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); + fromReader.nextVector(timestampColVector, isNull, batchSize); convertVector(timestampColVector, bytesColVector, batchSize); } } public static class StringGroupFromDateTreeReader extends ConvertTreeReader { - - private DateTreeReader dateTreeReader; - private final TypeDescription readerType; private LongColumnVector longColVector; private BytesColumnVector bytesColVector; @@ -1674,10 +1232,8 @@ public static class StringGroupFromDateTreeReader extends ConvertTreeReader { StringGroupFromDateTreeReader(int columnId, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, new DateTreeReader(columnId, context)); this.readerType = readerType; - dateTreeReader = new DateTreeReader(columnId, context); - setConvertTreeReader(dateTreeReader); date = new Date(0); } @@ -1699,31 +1255,26 @@ public void nextVector(ColumnVector previousVector, bytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - dateTreeReader.nextVector(longColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); convertVector(longColVector, bytesColVector, batchSize); } } public static class StringGroupFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private final TypeDescription readerType; StringGroupFromStringGroupTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); this.readerType = readerType; - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); } @Override public void nextVector(ColumnVector previousVector, boolean[] isNull, final int batchSize) throws IOException { - stringGroupTreeReader.nextVector(previousVector, isNull, batchSize); + fromReader.nextVector(previousVector, isNull, batchSize); BytesColumnVector resultColVector = (BytesColumnVector) previousVector; @@ -1750,19 +1301,14 @@ public void nextVector(ColumnVector previousVector, } public static class StringGroupFromBinaryTreeReader extends ConvertTreeReader { - - private BinaryTreeReader binaryTreeReader; - private final TypeDescription readerType; private BytesColumnVector inBytesColVector; private BytesColumnVector outBytesColVector; StringGroupFromBinaryTreeReader(int columnId, TypeDescription readerType, Context context) throws IOException { - super(columnId); + super(columnId, new BinaryTreeReader(columnId, context)); this.readerType = readerType; - binaryTreeReader = new BinaryTreeReader(columnId, context); - setConvertTreeReader(binaryTreeReader); } @Override @@ -1795,32 +1341,33 @@ public void nextVector(ColumnVector previousVector, outBytesColVector = (BytesColumnVector) previousVector; } // Read present/isNull stream - binaryTreeReader.nextVector(inBytesColVector, isNull, batchSize); + fromReader.nextVector(inBytesColVector, isNull, batchSize); convertVector(inBytesColVector, outBytesColVector, batchSize); } } public static class TimestampFromAnyIntegerTreeReader extends ConvertTreeReader { - - private AnyIntegerTreeReader anyIntegerAsLongTreeReader; - private LongColumnVector longColVector; private TimestampColumnVector timestampColVector; + private final boolean useUtc; + private final TimeZone local; TimestampFromAnyIntegerTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); - anyIntegerAsLongTreeReader = - new AnyIntegerTreeReader(columnId, fileType, context); - setConvertTreeReader(anyIntegerAsLongTreeReader); + Context context, + boolean isInstant) throws IOException { + super(columnId, createFromInteger(columnId, fileType, context)); + this.useUtc = isInstant || context.getUseUTCTimestamp(); + local = TimeZone.getDefault(); } @Override public void setConvertVectorElement(int elementNum) { - long longValue = longColVector.vector[elementNum]; - // UNDONE: What does the boolean setting need to be? - timestampColVector.set(elementNum, new Timestamp(longValue)); + long millis = longColVector.vector[elementNum] * 1000; + timestampColVector.time[elementNum] = useUtc + ? millis + : SerializationUtils.convertFromUtc(local, millis); + timestampColVector.nanos[elementNum] = 0; } @Override @@ -1833,70 +1380,38 @@ public void nextVector(ColumnVector previousVector, timestampColVector = (TimestampColumnVector) previousVector; } // Read present/isNull stream - anyIntegerAsLongTreeReader.nextVector(longColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); convertVector(longColVector, timestampColVector, batchSize); } } - public static class TimestampFromFloatTreeReader extends ConvertTreeReader { - - private FloatTreeReader floatTreeReader; - - private DoubleColumnVector doubleColVector; - private TimestampColumnVector timestampColVector; - - TimestampFromFloatTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); - floatTreeReader = new FloatTreeReader(columnId); - setConvertTreeReader(floatTreeReader); - } - - @Override - public void setConvertVectorElement(int elementNum) { - float floatValue = (float) doubleColVector.vector[elementNum]; - Timestamp timestampValue = TimestampUtils.doubleToTimestamp(floatValue); - // The TimestampColumnVector will set the entry to null when a null timestamp is passed in. - timestampColVector.set(elementNum, timestampValue); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - if (doubleColVector == null) { - // Allocate column vector for file; cast column vector for reader. - doubleColVector = new DoubleColumnVector(); - timestampColVector = (TimestampColumnVector) previousVector; - } - // Read present/isNull stream - floatTreeReader.nextVector(doubleColVector, isNull, batchSize); - - convertVector(doubleColVector, timestampColVector, batchSize); - } - } - public static class TimestampFromDoubleTreeReader extends ConvertTreeReader { - - private DoubleTreeReader doubleTreeReader; - private DoubleColumnVector doubleColVector; private TimestampColumnVector timestampColVector; + private final boolean useUtc; + private final TimeZone local; TimestampFromDoubleTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); - doubleTreeReader = new DoubleTreeReader(columnId); - setConvertTreeReader(doubleTreeReader); + TypeDescription readerType, Context context) throws IOException { + super(columnId, fileType.getCategory() == Category.DOUBLE ? + new DoubleTreeReader(columnId) : + new FloatTreeReader(columnId)); + useUtc = readerType.getCategory() == Category.TIMESTAMP_INSTANT || + context.getUseUTCTimestamp(); + local = TimeZone.getDefault(); } @Override public void setConvertVectorElement(int elementNum) { - double doubleValue = doubleColVector.vector[elementNum]; - Timestamp timestampValue = TimestampUtils.doubleToTimestamp(doubleValue); - // The TimestampColumnVector will set the entry to null when a null timestamp is passed in. - timestampColVector.set(elementNum, timestampValue); + double seconds = doubleColVector.vector[elementNum]; + if (!useUtc) { + seconds = SerializationUtils.convertFromUtc(local, seconds); + } + long wholeSec = (long) Math.floor(seconds); + timestampColVector.time[elementNum] = wholeSec * 1000; + timestampColVector.nanos[elementNum] = + 1_000_000 * (int) Math.round((seconds - wholeSec) * 1000); } @Override @@ -1909,37 +1424,42 @@ public void nextVector(ColumnVector previousVector, timestampColVector = (TimestampColumnVector) previousVector; } // Read present/isNull stream - doubleTreeReader.nextVector(doubleColVector, isNull, batchSize); + fromReader.nextVector(doubleColVector, isNull, batchSize); convertVector(doubleColVector, timestampColVector, batchSize); } } public static class TimestampFromDecimalTreeReader extends ConvertTreeReader { - - private DecimalTreeReader decimalTreeReader; - private final int precision; private final int scale; private DecimalColumnVector decimalColVector; private TimestampColumnVector timestampColVector; + private final boolean useUtc; + private final TimeZone local; TimestampFromDecimalTreeReader(int columnId, TypeDescription fileType, - Context context) throws IOException { - super(columnId); + Context context, + boolean isInstant) throws IOException { + super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(), + fileType.getScale(), context)); this.precision = fileType.getPrecision(); this.scale = fileType.getScale(); - decimalTreeReader = new DecimalTreeReader(columnId, precision, scale, context); - setConvertTreeReader(decimalTreeReader); + useUtc = isInstant || context.getUseUTCTimestamp(); + local = TimeZone.getDefault(); } @Override public void setConvertVectorElement(int elementNum) { - Timestamp timestampValue = - TimestampUtils.decimalToTimestamp( - decimalColVector.vector[elementNum].getHiveDecimal()); - // The TimestampColumnVector will set the entry to null when a null timestamp is passed in. - timestampColVector.set(elementNum, timestampValue); + Instant t = decimalToInstant(decimalColVector, elementNum); + if (!useUtc) { + timestampColVector.time[elementNum] = + SerializationUtils.convertFromUtc(local, t.getEpochSecond() * 1000); + timestampColVector.nanos[elementNum] = t.getNano(); + } else { + timestampColVector.time[elementNum] = t.getEpochSecond() * 1000; + timestampColVector.nanos[elementNum] = t.getNano(); + } } @Override @@ -1952,34 +1472,39 @@ public void nextVector(ColumnVector previousVector, timestampColVector = (TimestampColumnVector) previousVector; } // Read present/isNull stream - decimalTreeReader.nextVector(decimalColVector, isNull, batchSize); + fromReader.nextVector(decimalColVector, isNull, batchSize); convertVector(decimalColVector, timestampColVector, batchSize); } } public static class TimestampFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private BytesColumnVector bytesColVector; private TimestampColumnVector timestampColVector; + private final DateTimeFormatter formatter; - TimestampFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) + TimestampFromStringGroupTreeReader(int columnId, TypeDescription fileType, + Context context, boolean isInstant) throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); + if (isInstant) { + formatter = INSTANT_TIMESTAMP_FORMAT; + } else { + formatter = TIMESTAMP_FORMAT.withZone(context.getUseUTCTimestamp() ? + ZoneId.of("UTC") : + ZoneId.systemDefault()); + } } @Override public void setConvertVectorElement(int elementNum) throws IOException { - String stringValue = - stringFromBytesColumnVectorEntry(bytesColVector, elementNum); - Timestamp timestampValue = parseTimestampFromString(stringValue); - if (timestampValue != null) { - timestampColVector.set(elementNum, timestampValue); - } else { + String str = SerializationUtils.bytesVectorToString(bytesColVector, + elementNum); + try { + Instant instant = Instant.from(formatter.parse(str)); + timestampColVector.time[elementNum] = instant.getEpochSecond() * 1000; + timestampColVector.nanos[elementNum] = instant.getNano(); + } catch (DateTimeParseException exception) { timestampColVector.noNulls = false; timestampColVector.isNull[elementNum] = true; } @@ -1995,31 +1520,33 @@ public void nextVector(ColumnVector previousVector, timestampColVector = (TimestampColumnVector) previousVector; } // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); + fromReader.nextVector(bytesColVector, isNull, batchSize); convertVector(bytesColVector, timestampColVector, batchSize); } } public static class TimestampFromDateTreeReader extends ConvertTreeReader { - - private DateTreeReader dateTreeReader; - private LongColumnVector longColVector; private TimestampColumnVector timestampColVector; + private final boolean useUtc; + private final TimeZone local = TimeZone.getDefault(); - TimestampFromDateTreeReader(int columnId, TypeDescription fileType, + TimestampFromDateTreeReader(int columnId, TypeDescription readerType, Context context) throws IOException { - super(columnId); - dateTreeReader = new DateTreeReader(columnId, context); - setConvertTreeReader(dateTreeReader); + super(columnId, new DateTreeReader(columnId, context)); + useUtc = readerType.getCategory() == Category.TIMESTAMP_INSTANT || + context.getUseUTCTimestamp(); } @Override public void setConvertVectorElement(int elementNum) { - long millis = - DateWritable.daysToMillis((int) longColVector.vector[elementNum]); - timestampColVector.set(elementNum, new Timestamp(millis)); + long days = longColVector.vector[elementNum]; + long millis = days * 24 * 60 * 60 * 1000; + timestampColVector.time[elementNum] = useUtc ? + millis : + SerializationUtils.convertFromUtc(local, millis); + timestampColVector.nanos[elementNum] = 0; } @Override @@ -2032,31 +1559,26 @@ public void nextVector(ColumnVector previousVector, timestampColVector = (TimestampColumnVector) previousVector; } // Read present/isNull stream - dateTreeReader.nextVector(longColVector, isNull, batchSize); + fromReader.nextVector(longColVector, isNull, batchSize); convertVector(longColVector, timestampColVector, batchSize); } } public static class DateFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - private BytesColumnVector bytesColVector; private LongColumnVector longColVector; DateFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); + super(columnId, getStringGroupTreeReader(columnId, fileType, context)); } @Override - public void setConvertVectorElement(int elementNum) throws IOException { + public void setConvertVectorElement(int elementNum) { String stringValue = - stringFromBytesColumnVectorEntry(bytesColVector, elementNum); - Date dateValue = parseDateFromString(stringValue); + SerializationUtils.bytesVectorToString(bytesColVector, elementNum); + Date dateValue = SerializationUtils.parseDateFromString(stringValue); if (dateValue != null) { longColVector.vector[elementNum] = DateWritable.dateToDays(dateValue); } else { @@ -2075,31 +1597,31 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - stringGroupTreeReader.nextVector(bytesColVector, isNull, batchSize); + fromReader.nextVector(bytesColVector, isNull, batchSize); convertVector(bytesColVector, longColVector, batchSize); } } public static class DateFromTimestampTreeReader extends ConvertTreeReader { - - private TimestampTreeReader timestampTreeReader; - private TimestampColumnVector timestampColVector; private LongColumnVector longColVector; + private final ZoneId local; - DateFromTimestampTreeReader(int columnId, Context context) throws IOException { - super(columnId); - timestampTreeReader = new TimestampTreeReader(columnId, context); - setConvertTreeReader(timestampTreeReader); + DateFromTimestampTreeReader(int columnId, Context context, + boolean instantType) throws IOException { + super(columnId, new TimestampTreeReader(columnId, context, instantType)); + boolean useUtc = instantType || context.getUseUTCTimestamp(); + local = useUtc ? ZoneId.of("UTC") : ZoneId.systemDefault(); } @Override public void setConvertVectorElement(int elementNum) throws IOException { - Date dateValue = - DateWritable.timeToDate(TimestampUtils.millisToSeconds( - timestampColVector.asScratchTimestamp(elementNum).getTime())); - longColVector.vector[elementNum] = DateWritable.dateToDays(dateValue); + LocalDate day = LocalDate.from( + Instant.ofEpochSecond(timestampColVector.time[elementNum] / 1000, + timestampColVector.nanos[elementNum]) + .atZone(local)); + longColVector.vector[elementNum] = day.toEpochDay(); } @Override @@ -2112,31 +1634,12 @@ public void nextVector(ColumnVector previousVector, longColVector = (LongColumnVector) previousVector; } // Read present/isNull stream - timestampTreeReader.nextVector(timestampColVector, isNull, batchSize); + fromReader.nextVector(timestampColVector, isNull, batchSize); convertVector(timestampColVector, longColVector, batchSize); } } - public static class BinaryFromStringGroupTreeReader extends ConvertTreeReader { - - private TreeReader stringGroupTreeReader; - - BinaryFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context) - throws IOException { - super(columnId); - stringGroupTreeReader = getStringGroupTreeReader(columnId, fileType, context); - setConvertTreeReader(stringGroupTreeReader); - } - - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - final int batchSize) throws IOException { - super.nextVector(previousVector, isNull, batchSize); - } - } - private static TreeReader createAnyIntegerConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, @@ -2159,9 +1662,6 @@ private static TreeReader createAnyIntegerConvertTreeReader(int columnId, context); case FLOAT: - return new FloatFromAnyIntegerTreeReader(columnId, fileType, - context); - case DOUBLE: return new DoubleFromAnyIntegerTreeReader(columnId, fileType, context); @@ -2176,54 +1676,9 @@ private static TreeReader createAnyIntegerConvertTreeReader(int columnId, context); case TIMESTAMP: - return new TimestampFromAnyIntegerTreeReader(columnId, fileType, context); - - // Not currently supported conversion(s): - case BINARY: - case DATE: - - case STRUCT: - case LIST: - case MAP: - case UNION: - default: - throw new IllegalArgumentException("Unsupported type " + - readerType.getCategory()); - } - } - - private static TreeReader createFloatConvertTreeReader(int columnId, - TypeDescription fileType, - TypeDescription readerType, - Context context) throws IOException { - - // CONVERT from FLOAT to schema type. - switch (readerType.getCategory()) { - - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - return new AnyIntegerFromFloatTreeReader(columnId, readerType); - - case FLOAT: - throw new IllegalArgumentException("No conversion of type " + - readerType.getCategory() + " to self needed"); - - case DOUBLE: - return new DoubleFromFloatTreeReader(columnId); - - case DECIMAL: - return new DecimalFromFloatTreeReader(columnId, readerType); - - case STRING: - case CHAR: - case VARCHAR: - return new StringGroupFromFloatTreeReader(columnId, readerType, context); - - case TIMESTAMP: - return new TimestampFromFloatTreeReader(columnId, readerType, context); + case TIMESTAMP_INSTANT: + return new TimestampFromAnyIntegerTreeReader(columnId, fileType, context, + readerType.getCategory() == Category.TIMESTAMP_INSTANT); // Not currently supported conversion(s): case BINARY: @@ -2252,25 +1707,25 @@ private static TreeReader createDoubleConvertTreeReader(int columnId, case SHORT: case INT: case LONG: - return new AnyIntegerFromDoubleTreeReader(columnId, readerType); + return new AnyIntegerFromDoubleTreeReader(columnId, fileType, readerType); case FLOAT: - return new FloatFromDoubleTreeReader(columnId); + return new FloatFromDoubleTreeReader(columnId, context); case DOUBLE: - throw new IllegalArgumentException("No conversion of type " + - readerType.getCategory() + " to self needed"); + return new FloatTreeReader(columnId); case DECIMAL: - return new DecimalFromDoubleTreeReader(columnId, readerType); + return new DecimalFromDoubleTreeReader(columnId, fileType, readerType); case STRING: case CHAR: case VARCHAR: - return new StringGroupFromDoubleTreeReader(columnId, readerType, context); + return new StringGroupFromDoubleTreeReader(columnId, fileType, readerType, context); case TIMESTAMP: - return new TimestampFromDoubleTreeReader(columnId, readerType, context); + case TIMESTAMP_INSTANT: + return new TimestampFromDoubleTreeReader(columnId, fileType, readerType, context); // Not currently supported conversion(s): case BINARY: @@ -2302,8 +1757,6 @@ private static TreeReader createDecimalConvertTreeReader(int columnId, return new AnyIntegerFromDecimalTreeReader(columnId, fileType, readerType, context); case FLOAT: - return new FloatFromDecimalTreeReader(columnId, fileType, readerType, context); - case DOUBLE: return new DoubleFromDecimalTreeReader(columnId, fileType, context); @@ -2313,7 +1766,9 @@ private static TreeReader createDecimalConvertTreeReader(int columnId, return new StringGroupFromDecimalTreeReader(columnId, fileType, readerType, context); case TIMESTAMP: - return new TimestampFromDecimalTreeReader(columnId, fileType, context); + case TIMESTAMP_INSTANT: + return new TimestampFromDecimalTreeReader(columnId, fileType, context, + readerType.getCategory() == Category.TIMESTAMP_INSTANT); case DECIMAL: return new DecimalFromDecimalTreeReader(columnId, fileType, readerType, context); @@ -2348,8 +1803,6 @@ private static TreeReader createStringConvertTreeReader(int columnId, return new AnyIntegerFromStringGroupTreeReader(columnId, fileType, readerType, context); case FLOAT: - return new FloatFromStringGroupTreeReader(columnId, fileType, context); - case DOUBLE: return new DoubleFromStringGroupTreeReader(columnId, fileType, context); @@ -2367,118 +1820,12 @@ private static TreeReader createStringConvertTreeReader(int columnId, readerType.getCategory() + " to self needed"); case BINARY: - return new BinaryFromStringGroupTreeReader(columnId, fileType, context); - - case TIMESTAMP: - return new TimestampFromStringGroupTreeReader(columnId, fileType, context); - - case DATE: - return new DateFromStringGroupTreeReader(columnId, fileType, context); - - // Not currently supported conversion(s): - - case STRUCT: - case LIST: - case MAP: - case UNION: - default: - throw new IllegalArgumentException("Unsupported type " + - readerType.getCategory()); - } - } - - private static TreeReader createCharConvertTreeReader(int columnId, - TypeDescription fileType, - TypeDescription readerType, - Context context) throws IOException { - - // CONVERT from CHAR to schema type. - switch (readerType.getCategory()) { - - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - return new AnyIntegerFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case FLOAT: - return new FloatFromStringGroupTreeReader(columnId, fileType, context); - - case DOUBLE: - return new DoubleFromStringGroupTreeReader(columnId, fileType, context); - - case DECIMAL: - return new DecimalFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case STRING: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case VARCHAR: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case CHAR: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case BINARY: - return new BinaryFromStringGroupTreeReader(columnId, fileType, context); - - case TIMESTAMP: - return new TimestampFromStringGroupTreeReader(columnId, fileType, context); - - case DATE: - return new DateFromStringGroupTreeReader(columnId, fileType, context); - - // Not currently supported conversion(s): - - case STRUCT: - case LIST: - case MAP: - case UNION: - default: - throw new IllegalArgumentException("Unsupported type " + - readerType.getCategory()); - } - } - - private static TreeReader createVarcharConvertTreeReader(int columnId, - TypeDescription fileType, - TypeDescription readerType, - Context context) throws IOException { - - // CONVERT from VARCHAR to schema type. - switch (readerType.getCategory()) { - - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - return new AnyIntegerFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case FLOAT: - return new FloatFromStringGroupTreeReader(columnId, fileType, context); - - case DOUBLE: - return new DoubleFromStringGroupTreeReader(columnId, fileType, context); - - case DECIMAL: - return new DecimalFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case STRING: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case CHAR: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case VARCHAR: - return new StringGroupFromStringGroupTreeReader(columnId, fileType, readerType, context); - - case BINARY: - return new BinaryFromStringGroupTreeReader(columnId, fileType, context); + return new BinaryTreeReader(columnId, context); case TIMESTAMP: - return new TimestampFromStringGroupTreeReader(columnId, fileType, context); + case TIMESTAMP_INSTANT: + return new TimestampFromStringGroupTreeReader(columnId, fileType, context, + readerType.getCategory() == Category.TIMESTAMP_INSTANT); case DATE: return new DateFromStringGroupTreeReader(columnId, fileType, context); @@ -2499,7 +1846,7 @@ private static TreeReader createTimestampConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { - + boolean isInstant = fileType.getCategory() == Category.TIMESTAMP_INSTANT; // CONVERT from TIMESTAMP to schema type. switch (readerType.getCategory()) { @@ -2508,28 +1855,28 @@ private static TreeReader createTimestampConvertTreeReader(int columnId, case SHORT: case INT: case LONG: - return new AnyIntegerFromTimestampTreeReader(columnId, readerType, context); + return new AnyIntegerFromTimestampTreeReader(columnId, readerType, + context, isInstant); case FLOAT: - return new FloatFromTimestampTreeReader(columnId, context); - case DOUBLE: - return new DoubleFromTimestampTreeReader(columnId, context); + return new DoubleFromTimestampTreeReader(columnId, context, isInstant); case DECIMAL: - return new DecimalFromTimestampTreeReader(columnId, context); + return new DecimalFromTimestampTreeReader(columnId, context, isInstant); case STRING: case CHAR: case VARCHAR: - return new StringGroupFromTimestampTreeReader(columnId, readerType, context); + return new StringGroupFromTimestampTreeReader(columnId, readerType, + context, isInstant); case TIMESTAMP: - throw new IllegalArgumentException("No conversion of type " + - readerType.getCategory() + " to self needed"); + case TIMESTAMP_INSTANT: + return new TimestampTreeReader(columnId, context, isInstant); case DATE: - return new DateFromTimestampTreeReader(columnId, context); + return new DateFromTimestampTreeReader(columnId, context, isInstant); // Not currently supported conversion(s): case BINARY: @@ -2545,7 +1892,6 @@ private static TreeReader createTimestampConvertTreeReader(int columnId, } private static TreeReader createDateConvertTreeReader(int columnId, - TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { @@ -2558,6 +1904,7 @@ private static TreeReader createDateConvertTreeReader(int columnId, return new StringGroupFromDateTreeReader(columnId, readerType, context); case TIMESTAMP: + case TIMESTAMP_INSTANT: return new TimestampFromDateTreeReader(columnId, readerType, context); case DATE: @@ -2586,7 +1933,6 @@ private static TreeReader createDateConvertTreeReader(int columnId, } private static TreeReader createBinaryConvertTreeReader(int columnId, - TypeDescription fileType, TypeDescription readerType, Context context) throws IOException { @@ -2611,6 +1957,7 @@ private static TreeReader createBinaryConvertTreeReader(int columnId, case LONG: case DOUBLE: case TIMESTAMP: + case TIMESTAMP_INSTANT: case DECIMAL: case STRUCT: case LIST: @@ -2765,8 +2112,6 @@ public static TreeReader createConvertTreeReader(TypeDescription readerType, return createAnyIntegerConvertTreeReader(columnId, fileType, readerType, context); case FLOAT: - return createFloatConvertTreeReader(columnId, fileType, readerType, context); - case DOUBLE: return createDoubleConvertTreeReader(columnId, fileType, readerType, context); @@ -2774,22 +2119,19 @@ public static TreeReader createConvertTreeReader(TypeDescription readerType, return createDecimalConvertTreeReader(columnId, fileType, readerType, context); case STRING: - return createStringConvertTreeReader(columnId, fileType, readerType, context); - case CHAR: - return createCharConvertTreeReader(columnId, fileType, readerType, context); - case VARCHAR: - return createVarcharConvertTreeReader(columnId, fileType, readerType, context); + return createStringConvertTreeReader(columnId, fileType, readerType, context); case TIMESTAMP: + case TIMESTAMP_INSTANT: return createTimestampConvertTreeReader(columnId, fileType, readerType, context); case DATE: - return createDateConvertTreeReader(columnId, fileType, readerType, context); + return createDateConvertTreeReader(columnId, readerType, context); case BINARY: - return createBinaryConvertTreeReader(columnId, fileType, readerType, context); + return createBinaryConvertTreeReader(columnId, readerType, context); // UNDONE: Complex conversions... case STRUCT: @@ -2850,6 +2192,7 @@ public static boolean canConvert(TypeDescription fileType, TypeDescription reade } case TIMESTAMP: + case TIMESTAMP_INSTANT: switch (readerType.getCategory()) { // Not currently supported conversion(s): case BINARY: @@ -2886,6 +2229,7 @@ public static boolean canConvert(TypeDescription fileType, TypeDescription reade case LONG: case DOUBLE: case TIMESTAMP: + case TIMESTAMP_INSTANT: case DECIMAL: return false; default: diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index d1311b9a81..fb15aa2bce 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -352,7 +352,8 @@ public static ColumnStatistics[] deserializeStats( List fileStats) { ColumnStatistics[] result = new ColumnStatistics[fileStats.size()]; for(int i=0; i < result.length; ++i) { - result[i] = ColumnStatisticsImpl.deserialize(schema, fileStats.get(i)); + TypeDescription subschema = schema == null ? null : schema.findSubtype(i); + result[i] = ColumnStatisticsImpl.deserialize(subschema, fileStats.get(i)); } return result; } diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java index 1d4cc67989..d93c9bd598 100644 --- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java +++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -452,6 +452,7 @@ void buildConversion(TypeDescription fileType, case FLOAT: case STRING: case TIMESTAMP: + case TIMESTAMP_INSTANT: case BINARY: case DATE: // these are always a match diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java index 1852e5e9e5..06ba7111ac 100644 --- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java +++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,7 @@ package org.apache.orc.impl; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.orc.CompressionCodec; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; @@ -30,6 +31,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.sql.Date; import java.util.TimeZone; public final class SerializationUtils { @@ -1320,6 +1323,17 @@ public boolean isSafeSubtract(long left, long right) { return (left ^ right) >= 0 || (left ^ (left - right)) >= 0; } + /** + * Convert a UTC time to a local timezone + * @param local the local timezone + * @param time the number of seconds since 1970 + * @return the converted timestamp + */ + public static double convertFromUtc(TimeZone local, double time) { + int offset = local.getOffset((long) (time*1000) - local.getRawOffset()); + return time - offset / 1000.0; + } + public static long convertFromUtc(TimeZone local, long time) { int offset = local.getOffset(time - local.getRawOffset()); return time - offset; @@ -1377,4 +1391,58 @@ StreamOptions getCustomizedCodec(StreamOptions base, } return base; } + + /** + * Find the relative offset when moving between timezones at a particular + * point in time. + * + * This is a function of ORC v0 and v1 writing timestamps relative to the + * local timezone. Therefore, when we read, we need to convert from the + * writer's timezone to the reader's timezone. + * + * @param writer the timezone we are moving from + * @param reader the timezone we are moving to + * @param millis the point in time + * @return the change in milliseconds + */ + public static long convertBetweenTimezones(TimeZone writer, TimeZone reader, + long millis) { + final long writerOffset = writer.getOffset(millis); + final long readerOffset = reader.getOffset(millis); + long adjustedMillis = millis + writerOffset - readerOffset; + // If the timezone adjustment moves the millis across a DST boundary, we + // need to reevaluate the offsets. + long adjustedReader = reader.getOffset(adjustedMillis); + return writerOffset - adjustedReader; + } + + /** + * Convert a bytes vector element into a String. + * @param vector the vector to use + * @param elementNum the element number to stringify + * @return a string or null if the value was null + */ + public static String bytesVectorToString(BytesColumnVector vector, + int elementNum) { + if (vector.isRepeating) { + elementNum = 0; + } + return vector.noNulls || !vector.isNull[elementNum] ? + new String(vector.vector[elementNum], vector.start[elementNum], + vector.length[elementNum], StandardCharsets.UTF_8) : null; + } + + /** + * Parse a date from a string. + * @param string the date to parse (YYYY-MM-DD) + * @return the Date parsed, or null if there was a parse error. + */ + public static Date parseDateFromString(String string) { + try { + Date value = Date.valueOf(string); + return value; + } catch (IllegalArgumentException e) { + return null; + } + } } diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java index dae736a02f..712702a5a6 100644 --- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -893,33 +893,36 @@ public static class TimestampTreeReader extends TreeReader { private Map baseTimestampMap; protected long base_timestamp; private final TimeZone readerTimeZone; + private final boolean instantType; private TimeZone writerTimeZone; private boolean hasSameTZRules; private ThreadLocal threadLocalDateFormat; - TimestampTreeReader(int columnId, Context context) throws IOException { - this(columnId, null, null, null, null, context); + TimestampTreeReader(int columnId, Context context, + boolean instantType) throws IOException { + this(columnId, null, null, null, null, context, instantType); } protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream, - InStream nanosStream, OrcProto.ColumnEncoding encoding, Context context) - throws IOException { + InStream nanosStream, + OrcProto.ColumnEncoding encoding, + Context context, + boolean instantType) throws IOException { super(columnId, presentStream, context); + this.instantType = instantType; this.threadLocalDateFormat = new ThreadLocal<>(); this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); this.baseTimestampMap = new HashMap<>(); - if (context.getUseUTCTimestamp()) { + if (instantType || context.getUseUTCTimestamp()) { this.readerTimeZone = TimeZone.getTimeZone("UTC"); } else { this.readerTimeZone = TimeZone.getDefault(); } if (context.getWriterTimezone() == null || context.getWriterTimezone().isEmpty()) { - this.writerTimeZone = readerTimeZone; + this.base_timestamp = getBaseTimestamp(readerTimeZone.getID()); } else { - this.writerTimeZone = TimeZone.getTimeZone(context.getWriterTimezone()); + this.base_timestamp = getBaseTimestamp(context.getWriterTimezone()); } - this.hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone); - this.base_timestamp = getBaseTimestamp(readerTimeZone.getID()); if (encoding != null) { checkEncoding(encoding); @@ -930,7 +933,6 @@ protected TimestampTreeReader(int columnId, InStream presentStream, InStream dat if (nanosStream != null) { this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, context); } - base_timestamp = getBaseTimestamp(context.getWriterTimezone()); } } @@ -953,7 +955,9 @@ void startStripe(StripePlanner planner) throws IOException { nanos = createIntegerReader(kind, planner.getStream(new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), false, context); - base_timestamp = getBaseTimestamp(planner.getWriterTimezone()); + if (!instantType) { + base_timestamp = getBaseTimestamp(planner.getWriterTimezone()); + } } protected long getBaseTimestamp(String timeZoneId) throws IOException { @@ -962,24 +966,28 @@ protected long getBaseTimestamp(String timeZoneId) throws IOException { timeZoneId = readerTimeZone.getID(); } - if (!baseTimestampMap.containsKey(timeZoneId)) { + if (writerTimeZone == null || !timeZoneId.equals(writerTimeZone.getID())) { writerTimeZone = TimeZone.getTimeZone(timeZoneId); hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone); - threadLocalDateFormat.get().setTimeZone(writerTimeZone); - try { - long epoch = threadLocalDateFormat.get() - .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / - TimestampTreeWriter.MILLIS_PER_SECOND; - baseTimestampMap.put(timeZoneId, epoch); - return epoch; - } catch (ParseException e) { - throw new IOException("Unable to create base timestamp", e); - } finally { - threadLocalDateFormat.get().setTimeZone(readerTimeZone); + if (!baseTimestampMap.containsKey(timeZoneId)) { + threadLocalDateFormat.get().setTimeZone(writerTimeZone); + try { + long epoch = threadLocalDateFormat.get() + .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / + TimestampTreeWriter.MILLIS_PER_SECOND; + baseTimestampMap.put(timeZoneId, epoch); + return epoch; + } catch (ParseException e) { + throw new IOException("Unable to create base timestamp", e); + } finally { + threadLocalDateFormat.get().setTimeZone(readerTimeZone); + } + } else { + return baseTimestampMap.get(timeZoneId); } } - return baseTimestampMap.get(timeZoneId); + return base_timestamp; } @Override @@ -1015,19 +1023,10 @@ public void nextVector(ColumnVector previousVector, // If reader and writer time zones have different rules, adjust the timezone difference // between reader and writer taking day light savings into account. if (!hasSameTZRules) { - offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis); - } - long adjustedMillis = millis + offset; - // Sometimes the reader timezone might have changed after adding the adjustedMillis. - // To account for that change, check for any difference in reader timezone after - // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time). - if (!hasSameTZRules && - (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) { - long newOffset = - writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis); - adjustedMillis = millis + newOffset; + offset = SerializationUtils.convertBetweenTimezones(writerTimeZone, + readerTimeZone, millis); } - result.time[i] = adjustedMillis; + result.time[i] = millis + offset; result.nanos[i] = newNanos; if (result.isRepeating && i != 0 && (result.time[0] != result.time[i] || @@ -2351,7 +2350,9 @@ public static TreeReader createTreeReader(TypeDescription readerType, case BINARY: return new BinaryTreeReader(fileType.getId(), context); case TIMESTAMP: - return new TimestampTreeReader(fileType.getId(), context); + return new TimestampTreeReader(fileType.getId(), context, false); + case TIMESTAMP_INSTANT: + return new TimestampTreeReader(fileType.getId(), context, true); case DATE: return new DateTreeReader(fileType.getId(), context); case DECIMAL: diff --git a/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java b/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java index e1be9bd694..77e44113a8 100644 --- a/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java +++ b/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java @@ -59,6 +59,7 @@ public DataMask build(TypeDescription schema, case VARCHAR: return buildStringMask(schema); case TIMESTAMP: + case TIMESTAMP_INSTANT: return buildTimestampMask(schema); case DATE: return buildDateMask(schema); diff --git a/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java b/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java index fe54c49b0f..c647c107ea 100644 --- a/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java +++ b/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java @@ -40,8 +40,12 @@ public class ReaderEncryption { // A value of variants.length means no encryption private final ReaderEncryptionVariant[] columnVariants; - public ReaderEncryption() throws IOException { - this(null, null, null, null, null); + public ReaderEncryption() { + keyProvider = null; + keys = new ReaderEncryptionKey[0]; + masks = new MaskDescriptionImpl[0]; + variants = new ReaderEncryptionVariant[0]; + columnVariants = null; } public ReaderEncryption(OrcProto.Footer footer, diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java index 3ba2dbeb9c..00313b57ef 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java @@ -39,18 +39,19 @@ public class TimestampTreeWriter extends TreeWriterBase { public static final int MILLIS_PER_SECOND = 1000; public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); private final IntegerWriter seconds; private final IntegerWriter nanos; private final boolean isDirectV2; - private boolean useUTCTimestamp; + private final boolean alwaysUTC; private final TimeZone localTimezone; - private final long baseEpochSecsLocalTz; - private final long baseEpochSecsUTC; + private final long epoch; public TimestampTreeWriter(TypeDescription schema, WriterEncryptionVariant encryption, - WriterContext writer) throws IOException { + WriterContext writer, + boolean instantType) throws IOException { super(schema, encryption, writer); this.isDirectV2 = isNewWriteFormat(writer); this.seconds = createIntegerWriter(writer.createStream( @@ -62,22 +63,21 @@ public TimestampTreeWriter(TypeDescription schema, if (rowIndexPosition != null) { recordPosition(rowIndexPosition); } - this.useUTCTimestamp = writer.getUseUTCTimestamp(); + this.alwaysUTC = instantType || writer.getUseUTCTimestamp(); DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - this.localTimezone = TimeZone.getDefault(); - dateFormat.setTimeZone(this.localTimezone); try { - this.baseEpochSecsLocalTz = dateFormat - .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / - TimestampTreeWriter.MILLIS_PER_SECOND; - } catch (ParseException e) { - throw new IOException("Unable to create base timestamp tree writer", e); - } - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - try { - this.baseEpochSecsUTC = dateFormat - .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / - TimestampTreeWriter.MILLIS_PER_SECOND; + if (this.alwaysUTC) { + dateFormat.setTimeZone(UTC); + localTimezone = null; + epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / + TimestampTreeWriter.MILLIS_PER_SECOND; + + } else { + localTimezone = TimeZone.getDefault(); + dateFormat.setTimeZone(localTimezone); + epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() / + TimestampTreeWriter.MILLIS_PER_SECOND; + } } catch (ParseException e) { throw new IOException("Unable to create base timestamp tree writer", e); } @@ -86,11 +86,8 @@ public TimestampTreeWriter(TypeDescription schema, @Override OrcProto.ColumnEncoding.Builder getEncoding() { OrcProto.ColumnEncoding.Builder result = super.getEncoding(); - if (isDirectV2) { - result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); - } else { - result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); - } + result.setKind(isDirectV2 ? OrcProto.ColumnEncoding.Kind.DIRECT_V2 + : OrcProto.ColumnEncoding.Kind.DIRECT); return result; } @@ -109,7 +106,7 @@ public void writeBatch(ColumnVector vector, int offset, if (millis < 0 && newNanos > 999_999) { millis -= MILLIS_PER_SECOND; } - long utc = vec.isUTC() ? + long utc = vec.isUTC() || alwaysUTC ? millis : SerializationUtils.convertToUtc(localTimezone, millis); indexStatistics.updateTimestamp(utc); if (createBloomFilter) { @@ -120,7 +117,7 @@ public void writeBatch(ColumnVector vector, int offset, } final long nano = formatNanos(vec.nanos[0]); for (int i = 0; i < length; ++i) { - seconds.write(secs - (useUTCTimestamp ? baseEpochSecsUTC : baseEpochSecsLocalTz)); + seconds.write(secs - epoch); nanos.write(nano); } } @@ -135,13 +132,9 @@ public void writeBatch(ColumnVector vector, int offset, if (millis < 0 && newNanos > 999_999) { millis -= MILLIS_PER_SECOND; } - long utc = vec.isUTC() ? + long utc = vec.isUTC() || alwaysUTC ? millis : SerializationUtils.convertToUtc(localTimezone, millis); - if (useUTCTimestamp) { - seconds.write(secs - baseEpochSecsUTC); - } else { - seconds.write(secs - baseEpochSecsLocalTz); - } + seconds.write(secs - epoch); nanos.write(formatNanos(newNanos)); indexStatistics.updateTimestamp(utc); if (createBloomFilter) { @@ -163,7 +156,7 @@ public void writeStripe(int requiredIndexEntries) throws IOException { } } - private static long formatNanos(int nanos) { + static long formatNanos(int nanos) { if (nanos == 0) { return 0; } else if (nanos % 100 != 0) { diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java index 680cf8cebf..db81ae4f27 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java @@ -164,7 +164,9 @@ static TreeWriter createSubtree(TypeDescription schema, case BINARY: return new BinaryTreeWriter(schema, encryption, streamFactory); case TIMESTAMP: - return new TimestampTreeWriter(schema, encryption, streamFactory); + return new TimestampTreeWriter(schema, encryption, streamFactory, false); + case TIMESTAMP_INSTANT: + return new TimestampTreeWriter(schema, encryption, streamFactory, true); case DATE: return new DateTreeWriter(schema, encryption, streamFactory); case DECIMAL: diff --git a/java/core/src/test/org/apache/orc/TestTypeDescription.java b/java/core/src/test/org/apache/orc/TestTypeDescription.java index 6a48746030..6428993652 100644 --- a/java/core/src/test/org/apache/orc/TestTypeDescription.java +++ b/java/core/src/test/org/apache/orc/TestTypeDescription.java @@ -110,12 +110,14 @@ public void testParserSimple() { .addField("u", TypeDescription.createUnion() .addUnionChild(TypeDescription.createTimestamp()) .addUnionChild(TypeDescription.createVarchar() - .withMaxLength(100)))); + .withMaxLength(100)))) + .addField("tz", TypeDescription.createTimestampInstant()) + .addField("ts", TypeDescription.createTimestamp()); String expectedStr = "struct," + "map:map,str:struct>>"; + "varchar(100)>>,tz:timestamp with local time zone,ts:timestamp>"; assertEquals(expectedStr, expected.toString()); TypeDescription actual = TypeDescription.fromString(expectedStr); assertEquals(expected, actual); @@ -131,9 +133,9 @@ public void testParserUpper() { assertEquals("MY_FIELD", type.getFieldNames().get(0)); assertEquals(TypeDescription.Category.INT, type.getChildren().get(0).getCategory()); - type = TypeDescription.fromString("UNIONTYPE"); + type = TypeDescription.fromString("UNIONTYPE< TIMESTAMP WITH LOCAL TIME ZONE >"); assertEquals(TypeDescription.Category.UNION, type.getCategory()); - assertEquals(TypeDescription.Category.STRING, + assertEquals(TypeDescription.Category.TIMESTAMP_INSTANT, type.getChildren().get(0).getCategory()); } diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java index 2eb2d23fe5..69e1a405cc 100644 --- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java +++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.TimeZone; import java.util.function.IntFunction; import static junit.framework.TestCase.assertNotNull; @@ -1472,7 +1473,7 @@ public void testDate2038() throws Exception { private static void setUnion(VectorizedRowBatch batch, int rowId, Timestamp ts, Integer tag, Integer i, String s, - HiveDecimalWritable dec) { + HiveDecimalWritable dec, Timestamp instant) { UnionColumnVector union = (UnionColumnVector) batch.cols[1]; if (ts != null) { TimestampColumnVector timestampColVector = (TimestampColumnVector) batch.cols[0]; @@ -1510,6 +1511,12 @@ private static void setUnion(VectorizedRowBatch batch, int rowId, batch.cols[2].isNull[rowId] = true; batch.cols[2].noNulls = false; } + if (instant == null) { + batch.cols[3].isNull[rowId] = true; + batch.cols[3].noNulls = false; + } else { + ((TimestampColumnVector) batch.cols[3]).set(rowId, instant); + } } /** @@ -1676,9 +1683,14 @@ public void testDecimal64Reading() throws Exception { */ @Test public void testUnionAndTimestamp() throws Exception { + final TimeZone original = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")); TypeDescription schema = TypeDescription.fromString( - "struct," + - "decimal:decimal(38,18)>"); + "struct," + + "decimal:decimal(38,18)," + + "instant:timestamp with local time zone>" + ); HiveDecimal maxValue = HiveDecimal.create("10000000000000000000"); Writer writer = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf) @@ -1691,16 +1703,19 @@ public void testUnionAndTimestamp() throws Exception { VectorizedRowBatch batch = schema.createRowBatch(); batch.size = 6; setUnion(batch, 0, Timestamp.valueOf("2000-03-12 15:00:00"), 0, 42, null, - new HiveDecimalWritable("12345678.6547456")); + new HiveDecimalWritable("12345678.6547456"), + Timestamp.valueOf("2014-12-12 6:00:00")); setUnion(batch, 1, Timestamp.valueOf("2000-03-20 12:00:00.123456789"), - 1, null, "hello", new HiveDecimalWritable("-5643.234")); + 1, null, "hello", new HiveDecimalWritable("-5643.234"), + Timestamp.valueOf("1996-12-11 11:00:00")); - setUnion(batch, 2, null, null, null, null, null); - setUnion(batch, 3, null, 0, null, null, null); - setUnion(batch, 4, null, 1, null, null, null); + setUnion(batch, 2, null, null, null, null, null, null); + setUnion(batch, 3, null, 0, null, null, null, null); + setUnion(batch, 4, null, 1, null, null, null, null); setUnion(batch, 5, Timestamp.valueOf("1970-01-01 00:00:00"), 0, 200000, - null, new HiveDecimalWritable("10000000000000000000")); + null, new HiveDecimalWritable("10000000000000000000"), + Timestamp.valueOf("2011-07-01 09:00:00")); writer.addRowBatch(batch); batch.reset(); @@ -1711,10 +1726,10 @@ public void testUnionAndTimestamp() throws Exception { HiveDecimal.create(new BigInteger(64, rand), rand.nextInt(18)); if ((i & 1) == 0) { setUnion(batch, batch.size++, ts, 0, i*i, null, - new HiveDecimalWritable(dec)); + new HiveDecimalWritable(dec), null); } else { setUnion(batch, batch.size++, ts, 1, null, Integer.toString(i*i), - new HiveDecimalWritable(dec)); + new HiveDecimalWritable(dec), null); } if (maxValue.compareTo(dec) < 0) { maxValue = dec; @@ -1729,42 +1744,77 @@ public void testUnionAndTimestamp() throws Exception { batch.cols[c].setRepeating(true); } ((UnionColumnVector) batch.cols[1]).fields[0].isRepeating = true; - setUnion(batch, 0, null, 0, 1732050807, null, null); + setUnion(batch, 0, null, 0, 1732050807, null, null, null); for(int i=0; i < 5; ++i) { writer.addRowBatch(batch); } batch.reset(); batch.size = 3; - setUnion(batch, 0, null, 0, 0, null, null); - setUnion(batch, 1, null, 0, 10, null, null); - setUnion(batch, 2, null, 0, 138, null, null); + setUnion(batch, 0, null, 0, 0, null, null, null); + setUnion(batch, 1, null, 0, 10, null, null, null); + setUnion(batch, 2, null, 0, 138, null, null, null); writer.addRowBatch(batch); + // check the stats on the writer side + ColumnStatistics[] stats = writer.getStatistics(); + assertEquals("1996-12-11 11:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMinimum().toString()); + assertEquals("1996-12-11 11:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMinimumUTC().toString()); + assertEquals("2014-12-12 06:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMaximum().toString()); + assertEquals("2014-12-12 06:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMaximumUTC().toString()); + writer.close(); + + TimeZone.setDefault(TimeZone.getTimeZone("America/New_York")); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + stats = reader.getStatistics(); + + // check the timestamp statistics + assertEquals("1970-01-01 00:00:00.0", + ((TimestampColumnStatistics) stats[1]).getMinimum().toString()); + assertEquals("1969-12-31 19:00:00.0", + ((TimestampColumnStatistics) stats[1]).getMinimumUTC().toString()); + assertEquals("2037-05-05 12:34:56.203", + ((TimestampColumnStatistics) stats[1]).getMaximum().toString()); + assertEquals("2037-05-05 08:34:56.203", + ((TimestampColumnStatistics) stats[1]).getMaximumUTC().toString()); + + // check the instant statistics + assertEquals("1996-12-11 14:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMinimum().toString()); + assertEquals("1996-12-11 14:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMinimumUTC().toString()); + assertEquals("2014-12-12 09:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMaximum().toString()); + assertEquals("2014-12-12 09:00:00.0", + ((TimestampColumnStatistics) stats[6]).getMaximumUTC().toString()); + schema = writer.getSchema(); - assertEquals(5, schema.getMaximumId()); - boolean[] expected = new boolean[] {false, false, false, false, false, false}; + assertEquals(6, schema.getMaximumId()); + boolean[] expected = new boolean[] {false, false, false, false, false, false, false}; boolean[] included = OrcUtils.includeColumns("", schema); assertEquals(true, Arrays.equals(expected, included)); - expected = new boolean[] {false, true, false, false, false, true}; + expected = new boolean[] {false, true, false, false, false, true, false}; included = OrcUtils.includeColumns("time,decimal", schema); assertEquals(true, Arrays.equals(expected, included)); - expected = new boolean[] {false, false, true, true, true, false}; + expected = new boolean[] {false, false, true, true, true, false, false}; included = OrcUtils.includeColumns("union", schema); assertEquals(true, Arrays.equals(expected, included)); Assert.assertEquals(false, reader.getMetadataKeys().iterator().hasNext()); Assert.assertEquals(5077, reader.getNumberOfRows()); - DecimalColumnStatistics stats = + DecimalColumnStatistics decStats = (DecimalColumnStatistics) reader.getStatistics()[5]; - assertEquals(71, stats.getNumberOfValues()); - assertEquals(HiveDecimal.create("-5643.234"), stats.getMinimum()); - assertEquals(maxValue, stats.getMaximum()); + assertEquals(71, decStats.getNumberOfValues()); + assertEquals(HiveDecimal.create("-5643.234"), decStats.getMinimum()); + assertEquals(maxValue, decStats.getMaximum()); // TODO: fix this // assertEquals(null,stats.getSum()); int stripeCount = 0; @@ -1798,18 +1848,22 @@ public void testUnionAndTimestamp() throws Exception { LongColumnVector longs = (LongColumnVector) union.fields[0]; BytesColumnVector strs = (BytesColumnVector) union.fields[1]; DecimalColumnVector decs = (DecimalColumnVector) batch.cols[2]; + TimestampColumnVector instant = (TimestampColumnVector) batch.cols[3]; - assertEquals("struct,decimal:decimal(38,18)>", + assertEquals("struct,decimal:decimal(38,18)," + + "instant:timestamp with local time zone>", schema.toString()); assertEquals("2000-03-12 15:00:00.0", ts.asScratchTimestamp(0).toString()); assertEquals(0, union.tags[0]); assertEquals(42, longs.vector[0]); assertEquals("12345678.6547456", decs.vector[0].toString()); + assertEquals("2014-12-12 09:00:00.0", instant.asScratchTimestamp(0).toString()); assertEquals("2000-03-20 12:00:00.123456789", ts.asScratchTimestamp(1).toString()); assertEquals(1, union.tags[1]); assertEquals("hello", strs.toString(1)); assertEquals("-5643.234", decs.vector[1].toString()); + assertEquals("1996-12-11 14:00:00.0", instant.asScratchTimestamp(1).toString()); assertEquals(false, ts.noNulls); assertEquals(false, union.noNulls); @@ -1838,6 +1892,7 @@ public void testUnionAndTimestamp() throws Exception { assertEquals(200000, longs.vector[5]); assertEquals(false, decs.isNull[5]); assertEquals("10000000000000000000", decs.vector[5].toString()); + assertEquals("2011-07-01 12:00:00.0", instant.asScratchTimestamp(5).toString()); rand = new Random(42); for(int i=1970; i < 2038; ++i) { @@ -1896,6 +1951,7 @@ public void testUnionAndTimestamp() throws Exception { assertEquals("hello", strs.toString(0)); assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")), decs.vector[0]); rows.close(); + TimeZone.setDefault(original); } /** diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java index 736c71909e..e07425d5bb 100644 --- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java +++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java @@ -17,15 +17,23 @@ */ package org.apache.orc.impl; -import static junit.framework.TestCase.assertSame; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.TimeZone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -68,7 +76,7 @@ public void setup() throws Exception { conf = new Configuration(); options = new Reader.Options(conf); fs = FileSystem.getLocal(conf); - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); fs.delete(testFilePath, false); } @@ -328,7 +336,7 @@ public void testVarcharImplicitConversion() throws IOException { @Test public void testFloatToDoubleEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createFloat(); Writer writer = OrcFile.createWriter(testFilePath, @@ -349,13 +357,13 @@ public void testFloatToDoubleEvolution() throws Exception { RecordReader rows = reader.rows(reader.options().schema(schemaOnRead)); batch = schemaOnRead.createRowBatch(); rows.nextBatch(batch); - assertEquals(74.72, ((DoubleColumnVector) batch.cols[0]).vector[0], 0.00000000001); + assertEquals(74.72, ((DoubleColumnVector) batch.cols[0]).vector[0], 0.00001); rows.close(); } @Test public void testFloatToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createFloat(); Writer writer = OrcFile.createWriter(testFilePath, @@ -382,7 +390,7 @@ public void testFloatToDecimalEvolution() throws Exception { @Test public void testFloatToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createFloat(); Writer writer = OrcFile.createWriter(testFilePath, @@ -409,7 +417,7 @@ public void testFloatToDecimal64Evolution() throws Exception { @Test public void testDoubleToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createDouble(); Writer writer = OrcFile.createWriter(testFilePath, @@ -436,7 +444,7 @@ public void testDoubleToDecimalEvolution() throws Exception { @Test public void testDoubleToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createDouble(); Writer writer = OrcFile.createWriter(testFilePath, @@ -463,7 +471,7 @@ public void testDoubleToDecimal64Evolution() throws Exception { @Test public void testLongToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createLong(); Writer writer = OrcFile.createWriter(testFilePath, @@ -490,7 +498,7 @@ public void testLongToDecimalEvolution() throws Exception { @Test public void testLongToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createLong(); Writer writer = OrcFile.createWriter(testFilePath, @@ -517,7 +525,7 @@ public void testLongToDecimal64Evolution() throws Exception { @Test public void testDecimalToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createDecimal().withPrecision(38).withScale(0); Writer writer = OrcFile.createWriter(testFilePath, @@ -544,7 +552,7 @@ public void testDecimalToDecimalEvolution() throws Exception { @Test public void testDecimalToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createDecimal().withPrecision(38).withScale(2); Writer writer = OrcFile.createWriter(testFilePath, @@ -571,7 +579,7 @@ public void testDecimalToDecimal64Evolution() throws Exception { @Test public void testStringToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createString(); Writer writer = OrcFile.createWriter(testFilePath, @@ -599,7 +607,7 @@ public void testStringToDecimalEvolution() throws Exception { @Test public void testStringToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createString(); Writer writer = OrcFile.createWriter(testFilePath, @@ -627,7 +635,7 @@ public void testStringToDecimal64Evolution() throws Exception { @Test public void testTimestampToDecimalEvolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createTimestamp(); Writer writer = OrcFile.createWriter(testFilePath, @@ -654,7 +662,7 @@ public void testTimestampToDecimalEvolution() throws Exception { @Test public void testTimestampToDecimal64Evolution() throws Exception { - testFilePath = new Path(workDir, "TestOrcFile." + + testFilePath = new Path(workDir, "TestSchemaEvolution." + testCaseName.getMethodName() + ".orc"); TypeDescription schema = TypeDescription.createTimestamp(); Writer writer = OrcFile.createWriter(testFilePath, @@ -1569,21 +1577,6 @@ public void testFileIncludeWithNoEvolution() { assertFalse(fileInclude[3]); } - static void createStream(Map streams, - int id, - OrcProto.Stream.Kind kind, - int... values) throws IOException { - StreamName name = new StreamName(id, kind); - BufferChunkList ranges = new BufferChunkList(); - byte[] buffer = new byte[values.length]; - for(int i=0; i < values.length; ++i) { - buffer[i] = (byte) values[i]; - } - ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0)); - streams.put(name, InStream.create(name.toString(), ranges.get(), 0, - values.length)); - } - static ByteBuffer createBuffer(int... values) { ByteBuffer result = ByteBuffer.allocate(values.length); for(int v: values) { @@ -1657,4 +1650,620 @@ public void testPositionalEvolution() throws IOException { assertEquals(3, evo.getFileType(3).getId()); assertEquals(null, evo.getFileType(4)); } + + // These are helper methods that pull some of the common code into one + // place. + + static String decimalTimestampToString(long centiseconds, ZoneId zone) { + long sec = centiseconds / 100; + int nano = (int) ((centiseconds % 100) * 10_000_000); + return timestampToString(sec, nano, zone); + } + + static String doubleTimestampToString(double seconds, ZoneId zone) { + long sec = (long) seconds; + int nano = 1_000_000 * (int) Math.round((seconds - sec) * 1000); + return timestampToString(sec, nano, zone); + } + + static String timestampToString(long seconds, int nanos, ZoneId zone) { + return timestampToString(Instant.ofEpochSecond(seconds, nanos), zone); + } + + static String timestampToString(Instant time, ZoneId zone) { + return time.atZone(zone) + .format(ConvertTreeReaderFactory.INSTANT_TIMESTAMP_FORMAT); + } + + static void writeTimestampDataFile(Path path, + Configuration conf, + ZoneId writerZone, + DateTimeFormatter formatter, + String[] values) throws IOException { + TimeZone oldDefault = TimeZone.getDefault(); + try { + TimeZone.setDefault(TimeZone.getTimeZone(writerZone)); + TypeDescription fileSchema = + TypeDescription.fromString("struct"); + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf).setSchema(fileSchema).stripeSize(10000)); + VectorizedRowBatch batch = fileSchema.createRowBatch(1024); + TimestampColumnVector t1 = (TimestampColumnVector) batch.cols[0]; + TimestampColumnVector t2 = (TimestampColumnVector) batch.cols[1]; + for (int r = 0; r < values.length; ++r) { + int row = batch.size++; + Instant t = Instant.from(formatter.parse(values[r])); + t1.time[row] = t.getEpochSecond() * 1000; + t1.nanos[row] = t.getNano(); + t2.time[row] = t1.time[row]; + t2.nanos[row] = t1.nanos[row]; + if (batch.size == 1024) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } finally { + TimeZone.setDefault(oldDefault); + } + } + + /** + * Tests the various conversions from timestamp and timestamp with local + * timezone. + * + * It writes an ORC file with timestamp and timestamp with local time zone + * and then reads it back in with each of the relevant types. + * + * This test test both with and without the useUtc flag. + * + * It uses Australia/Sydney and America/New_York because they both have + * DST and they move in opposite directions on different days. Thus, we + * end up with four sets of offsets. + * + * Picking the 27th of the month puts it around when DST changes. + */ + @Test + public void testEvolutionFromTimestamp() throws Exception { + // The number of rows in the file that we test with. + final int VALUES = 1024; + // The different timezones that we'll use for this test. + final ZoneId UTC = ZoneId.of("UTC"); + final ZoneId WRITER_ZONE = ZoneId.of("America/New_York"); + final ZoneId READER_ZONE = ZoneId.of("Australia/Sydney"); + + final TimeZone oldDefault = TimeZone.getDefault(); + + // generate the timestamps to use + String[] timeStrings = new String[VALUES]; + for(int r=0; r < timeStrings.length; ++r) { + timeStrings[r] = String.format("%04d-%02d-27 23:45:56.7", + 2000 + (r / 12), (r % 12) + 1); + } + + final DateTimeFormatter WRITER_FORMAT = + ConvertTreeReaderFactory.TIMESTAMP_FORMAT.withZone(WRITER_ZONE); + + writeTimestampDataFile(testFilePath, conf, WRITER_ZONE, WRITER_FORMAT, timeStrings); + + try { + TimeZone.setDefault(TimeZone.getTimeZone(READER_ZONE)); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); + Reader.Options rowOptions = new Reader.Options(); + + try (Reader reader = OrcFile.createReader(testFilePath, options)) { + + // test conversion to long + TypeDescription readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + LongColumnVector t1 = (LongColumnVector) batch.cols[0]; + LongColumnVector t2 = (LongColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, (timeStrings[r] + " " + + READER_ZONE.getId()).replace(".7 ", ".0 "), + timestampToString(t1.vector[current], 0, READER_ZONE)); + assertEquals("row " + r, (timeStrings[r] + " " + + WRITER_ZONE.getId()).replace(".7 ", ".0 "), + timestampToString(t2.vector[current], 0, WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to decimal + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + Decimal64ColumnVector t1 = (Decimal64ColumnVector) batch.cols[0]; + Decimal64ColumnVector t2 = (Decimal64ColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(), + decimalTimestampToString(t1.vector[current], READER_ZONE)); + assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(), + decimalTimestampToString(t2.vector[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to double + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + DoubleColumnVector t1 = (DoubleColumnVector) batch.cols[0]; + DoubleColumnVector t2 = (DoubleColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(), + doubleTimestampToString(t1.vector[current], READER_ZONE)); + assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(), + doubleTimestampToString(t2.vector[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to date + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + LongColumnVector t1 = (LongColumnVector) batch.cols[0]; + LongColumnVector t2 = (LongColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + String date = timeStrings[r].substring(0, 10); + assertEquals("row " + r, date, + ConvertTreeReaderFactory.DATE_FORMAT.format( + LocalDate.ofEpochDay(t1.vector[current]))); + // NYC -> Sydney moves forward a day for instant + assertEquals("row " + r, date.replace("-27", "-28"), + ConvertTreeReaderFactory.DATE_FORMAT.format( + LocalDate.ofEpochDay(t2.vector[current]))); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to string + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + BytesColumnVector bytesT1 = (BytesColumnVector) batch.cols[0]; + BytesColumnVector bytesT2 = (BytesColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r], bytesT1.toString(current)); + Instant t = Instant.from(WRITER_FORMAT.parse(timeStrings[r])); + assertEquals("row " + r, + timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), + READER_ZONE), + bytesT2.toString(current)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion between timestamps + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + TimestampColumnVector timeT1 = (TimestampColumnVector) batch.cols[0]; + TimestampColumnVector timeT2 = (TimestampColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(), + timestampToString(timeT1.time[current] / 1000, timeT1.nanos[current], READER_ZONE)); + assertEquals("row " + r, + timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), READER_ZONE), + timestampToString(timeT2.time[current] / 1000, timeT2.nanos[current], READER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + } + + // Now test using UTC as local + options.useUTCTimestamp(true); + try (Reader reader = OrcFile.createReader(testFilePath, options)) { + DateTimeFormatter UTC_FORMAT = + ConvertTreeReaderFactory.TIMESTAMP_FORMAT.withZone(UTC); + + // test conversion to int in UTC + TypeDescription readerSchema = + TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + LongColumnVector t1 = (LongColumnVector) batch.cols[0]; + LongColumnVector t2 = (LongColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, (timeStrings[r] + " " + + UTC.getId()).replace(".7 ", ".0 "), + timestampToString(t1.vector[current], 0, UTC)); + assertEquals("row " + r, (timeStrings[r] + " " + + WRITER_ZONE.getId()).replace(".7 ", ".0 "), + timestampToString(t2.vector[current], 0, WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to decimal + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + Decimal64ColumnVector t1 = (Decimal64ColumnVector) batch.cols[0]; + Decimal64ColumnVector t2 = (Decimal64ColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " " + UTC.getId(), + decimalTimestampToString(t1.vector[current], UTC)); + assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(), + decimalTimestampToString(t2.vector[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to double + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + DoubleColumnVector t1 = (DoubleColumnVector) batch.cols[0]; + DoubleColumnVector t2 = (DoubleColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " " + UTC.getId(), + doubleTimestampToString(t1.vector[current], UTC)); + assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(), + doubleTimestampToString(t2.vector[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to date + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + LongColumnVector t1 = (LongColumnVector) batch.cols[0]; + LongColumnVector t2 = (LongColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + String date = timeStrings[r].substring(0, 10); + assertEquals("row " + r, date, + ConvertTreeReaderFactory.DATE_FORMAT.format( + LocalDate.ofEpochDay(t1.vector[current]))); + // NYC -> UTC still moves forward a day + assertEquals("row " + r, date.replace("-27", "-28"), + ConvertTreeReaderFactory.DATE_FORMAT.format( + LocalDate.ofEpochDay(t2.vector[current]))); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion to string in UTC + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + BytesColumnVector bytesT1 = (BytesColumnVector) batch.cols[0]; + BytesColumnVector bytesT2 = (BytesColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r], bytesT1.toString(current)); + assertEquals("row " + r, + timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), + UTC), + bytesT2.toString(current)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // test conversion between timestamps in UTC + readerSchema = TypeDescription.fromString("struct"); + try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) { + VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES); + TimestampColumnVector timeT1 = (TimestampColumnVector) batch.cols[0]; + TimestampColumnVector timeT2 = (TimestampColumnVector) batch.cols[1]; + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + assertEquals("row " + r, timeStrings[r] + " UTC", + timestampToString(timeT1.time[current] / 1000, timeT1.nanos[current], UTC)); + assertEquals("row " + r, + timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), UTC), + timestampToString(timeT2.time[current] / 1000, timeT2.nanos[current], UTC)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + } + } finally { + TimeZone.setDefault(oldDefault); + } + } + + static void writeEvolutionToTimestamp(Path path, + Configuration conf, + ZoneId writerZone, + String[] values) throws IOException { + TypeDescription fileSchema = + TypeDescription.fromString("struct"); + ZoneId UTC = ZoneId.of("UTC"); + DateTimeFormatter WRITER_FORMAT = ConvertTreeReaderFactory.TIMESTAMP_FORMAT + .withZone(writerZone); + DateTimeFormatter UTC_FORMAT = ConvertTreeReaderFactory.TIMESTAMP_FORMAT + .withZone(UTC); + DateTimeFormatter UTC_DATE = ConvertTreeReaderFactory.DATE_FORMAT + .withZone(UTC); + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf).setSchema(fileSchema).stripeSize(10000)); + VectorizedRowBatch batch = fileSchema.createRowBatchV2(); + int batchSize = batch.getMaxSize(); + LongColumnVector l1 = (LongColumnVector) batch.cols[0]; + LongColumnVector l2 = (LongColumnVector) batch.cols[1]; + Decimal64ColumnVector d1 = (Decimal64ColumnVector) batch.cols[2]; + Decimal64ColumnVector d2 = (Decimal64ColumnVector) batch.cols[3]; + DoubleColumnVector dbl1 = (DoubleColumnVector) batch.cols[4]; + DoubleColumnVector dbl2 = (DoubleColumnVector) batch.cols[5]; + LongColumnVector dt1 = (LongColumnVector) batch.cols[6]; + LongColumnVector dt2 = (LongColumnVector) batch.cols[7]; + BytesColumnVector s1 = (BytesColumnVector) batch.cols[8]; + BytesColumnVector s2 = (BytesColumnVector) batch.cols[9]; + for (int r = 0; r < values.length; ++r) { + int row = batch.size++; + Instant utcTime = Instant.from(UTC_FORMAT.parse(values[r])); + Instant writerTime = Instant.from(WRITER_FORMAT.parse(values[r])); + l1.vector[row] = utcTime.getEpochSecond(); + l2.vector[row] = writerTime.getEpochSecond(); + // balance out the 2 digits of scale + d1.vector[row] = utcTime.toEpochMilli() / 10; + d2.vector[row] = writerTime.toEpochMilli() / 10; + // convert to double + dbl1.vector[row] = utcTime.toEpochMilli() / 1000.0; + dbl2.vector[row] = writerTime.toEpochMilli() / 1000.0; + // convert to date + dt1.vector[row] = UTC_DATE.parse(values[r].substring(0, 10)) + .getLong(ChronoField.EPOCH_DAY); + dt2.vector[row] = dt1.vector[row]; + // set the strings + s1.setVal(row, values[r].getBytes(StandardCharsets.UTF_8)); + String withZone = values[r] + " " + writerZone.getId(); + s2.setVal(row, withZone.getBytes(StandardCharsets.UTF_8)); + + if (batch.size == batchSize) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } + + /** + * Tests the various conversions to timestamp. + * + * It writes an ORC file with two longs, two decimals, and two strings and + * then reads it back with the types converted to timestamp and timestamp + * with local time zone. + * + * This test is run both with and without setting the useUtc flag. + * + * It uses Australia/Sydney and America/New_York because they both have + * DST and they move in opposite directions on different days. Thus, we + * end up with four sets of offsets. + */ + @Test + public void testEvolutionToTimestamp() throws Exception { + // The number of rows in the file that we test with. + final int VALUES = 1024; + // The different timezones that we'll use for this test. + final ZoneId WRITER_ZONE = ZoneId.of("America/New_York"); + final ZoneId READER_ZONE = ZoneId.of("Australia/Sydney"); + + final TimeZone oldDefault = TimeZone.getDefault(); + final ZoneId UTC = ZoneId.of("UTC"); + + // generate the timestamps to use + String[] timeStrings = new String[VALUES]; + for(int r=0; r < timeStrings.length; ++r) { + timeStrings[r] = String.format("%04d-%02d-27 12:34:56.1", + 1960 + (r / 12), (r % 12) + 1); + } + + writeEvolutionToTimestamp(testFilePath, conf, WRITER_ZONE, timeStrings); + + try { + TimeZone.setDefault(TimeZone.getTimeZone(READER_ZONE)); + + // test timestamp, timestamp with local time zone to long + TypeDescription readerSchema = TypeDescription.fromString( + "struct"); + VectorizedRowBatch batch = readerSchema.createRowBatchV2(); + TimestampColumnVector l1 = (TimestampColumnVector) batch.cols[0]; + TimestampColumnVector l2 = (TimestampColumnVector) batch.cols[1]; + TimestampColumnVector d1 = (TimestampColumnVector) batch.cols[2]; + TimestampColumnVector d2 = (TimestampColumnVector) batch.cols[3]; + TimestampColumnVector dbl1 = (TimestampColumnVector) batch.cols[4]; + TimestampColumnVector dbl2 = (TimestampColumnVector) batch.cols[5]; + TimestampColumnVector dt1 = (TimestampColumnVector) batch.cols[6]; + TimestampColumnVector dt2 = (TimestampColumnVector) batch.cols[7]; + TimestampColumnVector s1 = (TimestampColumnVector) batch.cols[8]; + TimestampColumnVector s2 = (TimestampColumnVector) batch.cols[9]; + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); + Reader.Options rowOptions = new Reader.Options().schema(readerSchema); + + try (Reader reader = OrcFile.createReader(testFilePath, options); + RecordReader rows = reader.rows(rowOptions)) { + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + + String expected1 = timeStrings[r] + " " + READER_ZONE.getId(); + String expected2 = timeStrings[r] + " " + WRITER_ZONE.getId(); + String midnight = timeStrings[r].substring(0, 10) + " 00:00:00.0"; + String expectedDate1 = midnight + " " + READER_ZONE.getId(); + String expectedDate2 = midnight + " " + UTC.getId(); + + assertEquals("row " + r, expected1.replace(".1 ", ".0 "), + timestampToString(l1.time[current] / 1000, l1.nanos[current], READER_ZONE)); + + assertEquals("row " + r, expected2.replace(".1 ", ".0 "), + timestampToString(l2.time[current] / 1000, l2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expected1, + timestampToString(d1.time[current] / 1000, d1.nanos[current], READER_ZONE)); + + assertEquals("row " + r, expected2, + timestampToString(d2.time[current] / 1000, d2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expected1, + timestampToString(dbl1.time[current] / 1000, dbl1.nanos[current], READER_ZONE)); + + assertEquals("row " + r, expected2, + timestampToString(dbl2.time[current] / 1000, dbl2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expectedDate1, + timestampToString(dt1.time[current] / 1000, dt1.nanos[current], READER_ZONE)); + + assertEquals("row " + r, expectedDate2, + timestampToString(dt2.time[current] / 1000, dt2.nanos[current], UTC)); + + assertEquals("row " + r, expected1, + timestampToString(s1.time[current] / 1000, s1.nanos[current], READER_ZONE)); + + assertEquals("row " + r, expected2, + timestampToString(s2.time[current] / 1000, s2.nanos[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + + // try the tests with useUtc set on + options.useUTCTimestamp(true); + try (Reader reader = OrcFile.createReader(testFilePath, options); + RecordReader rows = reader.rows(rowOptions)) { + int current = 0; + for (int r = 0; r < VALUES; ++r) { + if (current == batch.size) { + assertEquals("row " + r, true, rows.nextBatch(batch)); + current = 0; + } + + String expected1 = timeStrings[r] + " " + UTC.getId(); + String expected2 = timeStrings[r] + " " + WRITER_ZONE.getId(); + String midnight = timeStrings[r].substring(0, 10) + " 00:00:00.0"; + String expectedDate = midnight + " " + UTC.getId(); + + assertEquals("row " + r, expected1.replace(".1 ", ".0 "), + timestampToString(l1.time[current] / 1000, l1.nanos[current], UTC)); + + assertEquals("row " + r, expected2.replace(".1 ", ".0 "), + timestampToString(l2.time[current] / 1000, l2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expected1, + timestampToString(d1.time[current] / 1000, d1.nanos[current], UTC)); + + assertEquals("row " + r, expected2, + timestampToString(d2.time[current] / 1000, d2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expected1, + timestampToString(dbl1.time[current] / 1000, dbl1.nanos[current], UTC)); + + assertEquals("row " + r, expected2, + timestampToString(dbl2.time[current] / 1000, dbl2.nanos[current], WRITER_ZONE)); + + assertEquals("row " + r, expectedDate, + timestampToString(dt1.time[current] / 1000, dt1.nanos[current], UTC)); + + assertEquals("row " + r, expectedDate, + timestampToString(dt2.time[current] / 1000, dt2.nanos[current], UTC)); + + assertEquals("row " + r, expected1, + timestampToString(s1.time[current] / 1000, s1.nanos[current], UTC)); + + assertEquals("row " + r, expected2, + timestampToString(s2.time[current] / 1000, s2.nanos[current], WRITER_ZONE)); + current += 1; + } + assertEquals(false, rows.nextBatch(batch)); + } + } finally { + TimeZone.setDefault(oldDefault); + } + } } diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java index 4c3c0d3897..ea49788478 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java @@ -543,6 +543,7 @@ public static WritableComparable nextValue(ColumnVector vector, case DATE: return nextDate(vector, row, previous); case TIMESTAMP: + case TIMESTAMP_INSTANT: return nextTimestamp(vector, row, previous); case STRUCT: return nextStruct(vector, row, schema, previous); diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java index 59f89f7dd9..eaf1ce229e 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java @@ -221,6 +221,7 @@ public static void setColumn(TypeDescription schema, setLongValue(vector, row, ((DateWritable) value).getDays()); break; case TIMESTAMP: + case TIMESTAMP_INSTANT: ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value); break; case DECIMAL: diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java index 51e0d60ef6..d24d3a7321 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java @@ -189,6 +189,7 @@ public static WritableComparable createValue(TypeDescription type) { case DATE: return new DateWritable(); case TIMESTAMP: + case TIMESTAMP_INSTANT: return new OrcTimestamp(); case DECIMAL: return new HiveDecimalWritable(); diff --git a/java/tools/src/java/org/apache/orc/tools/PrintData.java b/java/tools/src/java/org/apache/orc/tools/PrintData.java index 5d74a21655..419d13c99f 100644 --- a/java/tools/src/java/org/apache/orc/tools/PrintData.java +++ b/java/tools/src/java/org/apache/orc/tools/PrintData.java @@ -151,6 +151,7 @@ static void printValue(JSONWriter writer, ColumnVector vector, (int) ((LongColumnVector) vector).vector[row]).toString()); break; case TIMESTAMP: + case TIMESTAMP_INSTANT: writer.value(((TimestampColumnVector) vector) .asScratchTimestamp(row).toString()); break; diff --git a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java index ce32336f19..62c458d18e 100644 --- a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java +++ b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java @@ -315,6 +315,7 @@ Converter buildConverter(IntWritable startOffset, TypeDescription schema) { case VARCHAR: return new BytesConverter(startOffset); case TIMESTAMP: + case TIMESTAMP_INSTANT: return new TimestampConverter(startOffset); case STRUCT: return new StructConverter(startOffset, schema); diff --git a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java index c020d11310..1c058fab7c 100644 --- a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java +++ b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java @@ -276,6 +276,7 @@ static JsonConverter createConverter(TypeDescription schema) { case DECIMAL: return new DecimalColumnConverter(); case TIMESTAMP: + case TIMESTAMP_INSTANT: return new TimestampColumnConverter(); case BINARY: return new BinaryColumnConverter(); diff --git a/java/tools/src/java/org/apache/orc/tools/json/HiveType.java b/java/tools/src/java/org/apache/orc/tools/json/HiveType.java index 6222acac51..58c4a3ec93 100644 --- a/java/tools/src/java/org/apache/orc/tools/json/HiveType.java +++ b/java/tools/src/java/org/apache/orc/tools/json/HiveType.java @@ -31,7 +31,7 @@ enum Kind { NULL(0), BOOLEAN(1), BYTE(1), SHORT(2), INT(3), LONG(4), DECIMAL(5), FLOAT(6), DOUBLE(7), - BINARY(1), DATE(1), TIMESTAMP(1), STRING(2), + BINARY(1), DATE(1), TIMESTAMP(1), TIMESTAMP_INSTANT(1), STRING(2), STRUCT(1, false), LIST(1, false), UNION(8, false); diff --git a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java index 8b53ee1706..dac75d7e1c 100644 --- a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java +++ b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java @@ -286,6 +286,8 @@ HiveType makeHiveType(TypeDescription schema) { return new StringType(HiveType.Kind.STRING); case TIMESTAMP: return new StringType(HiveType.Kind.TIMESTAMP); + case TIMESTAMP_INSTANT: + return new StringType(HiveType.Kind.TIMESTAMP_INSTANT); case DATE: return new StringType(HiveType.Kind.DATE); case BINARY: diff --git a/java/tools/src/java/org/apache/orc/tools/json/StringType.java b/java/tools/src/java/org/apache/orc/tools/json/StringType.java index 32cb73df31..3d9a97c327 100644 --- a/java/tools/src/java/org/apache/orc/tools/json/StringType.java +++ b/java/tools/src/java/org/apache/orc/tools/json/StringType.java @@ -38,6 +38,8 @@ public String toString() { return "string"; case TIMESTAMP: return "timestamp"; + case TIMESTAMP_INSTANT: + return "timestamp with local time zone"; case DATE: return "date"; default: @@ -67,6 +69,8 @@ public TypeDescription getSchema() { return TypeDescription.createString(); case TIMESTAMP: return TypeDescription.createTimestamp(); + case TIMESTAMP_INSTANT: + return TypeDescription.createTimestampInstant(); case DATE: return TypeDescription.createDate(); default: diff --git a/proto/orc_proto.proto b/proto/orc_proto.proto index a2f4c245b6..337854b5e1 100644 --- a/proto/orc_proto.proto +++ b/proto/orc_proto.proto @@ -212,6 +212,7 @@ message Type { DATE = 15; VARCHAR = 16; CHAR = 17; + TIMESTAMP_INSTANT = 18; } optional Kind kind = 1; repeated uint32 subtypes = 2 [packed=true]; diff --git a/site/_docs/types.md b/site/_docs/types.md index a14adf4bfa..dbe843be75 100644 --- a/site/_docs/types.md +++ b/site/_docs/types.md @@ -29,6 +29,7 @@ ORC provides a rich set of scalar and compound types: * binary * Date/time * timestamp + * timestamp with local time zone * date * Compound types * struct @@ -62,3 +63,20 @@ create table Foobar ( ![ORC column structure](/img/TreeWriters.png) +# Timestamps + +ORC includes two different forms of timestamps from the SQL world: + +* **Timestamp** is a date and time without a time zone, which does not change based on the time zone of the reader. +* **Timestamp with local time zone** is a fixed instant in time, which does change based on the time zone of the reader. + +Unless your application uses UTC consistently, **timestamp with +local time zone** is strongly preferred over **timestamp** for most +use cases. When users say an event is at 10:00, it is always in +reference to a certain timezone and means a point in time, rather than +10:00 in an arbitrary time zone. + +| Type | Value in America/Los_Angeles | Value in America/New_York | +| ----------- | ---------------------------- | ------------------------- | +| **timestamp** | 2014-12-12 6:00:00 | 2014-12-12 6:00:00 | +| **timestamp with local time zone** | 2014-12-12 9:00:00 | 2014-12-12 6:00:00 | \ No newline at end of file