Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/145074.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 145074
summary: Fix ORC type support gaps
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
Expand Down Expand Up @@ -118,11 +120,11 @@ public SourceMetadata metadata(StorageObject object) throws IOException {
* {@code false} uses getMinimum/getMaximum which applies a local-timezone shift. Without
* it, predicates against TIMESTAMP_INSTANT columns cause false stripe exclusions.
* <p>
* This is safe because ESQL maps all date types to DATETIME using UTC epoch millis,
* and the ORC files use TIMESTAMP_INSTANT (UTC-anchored) columns. If we ever support
* files with plain TIMESTAMP columns (writer-local timezone), this flag would incorrectly
* treat their statistics as UTC too — at that point we'd need per-column evaluation by
* bypassing SearchArgument and reading stripe statistics directly (the Trino approach).
* This is safe because the ORC files use TIMESTAMP_INSTANT (UTC-anchored) columns. If we
* ever support files with plain TIMESTAMP columns (writer-local timezone), this flag would
* incorrectly treat their statistics as UTC too — at that point we'd need per-column
* evaluation by bypassing SearchArgument and reading stripe statistics directly (the Trino
* approach).
*/
private static OrcFile.ReaderOptions orcReaderOptions(OrcStorageObjectAdapter fs) {
return OrcFile.readerOptions(new Configuration(false)).filesystem(fs).useUTCTimestamp(true);
Expand Down Expand Up @@ -331,8 +333,8 @@ private static DataType convertOrcTypeToEsql(TypeDescription orcType) {
case FLOAT, DOUBLE -> DataType.DOUBLE;
case STRING -> DataType.TEXT;
case VARCHAR, CHAR -> DataType.KEYWORD;
case BINARY -> DataType.KEYWORD;
case TIMESTAMP, TIMESTAMP_INSTANT, DATE -> DataType.DATETIME;
case TIMESTAMP, TIMESTAMP_INSTANT -> DataType.DATETIME;
case DATE -> DataType.DATETIME;
case DECIMAL -> DataType.DOUBLE;
case LIST -> convertOrcTypeToEsql(orcType.getChildren().get(0));
default -> DataType.UNSUPPORTED;
Expand Down Expand Up @@ -552,7 +554,8 @@ private Block createListLongBlock(ListColumnVector listCol, int rowCount) {
}

private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) {
DoubleColumnVector child = (DoubleColumnVector) listCol.child;
ColumnVector child = listCol.child;
double d64ScaleFactor = child instanceof Decimal64ColumnVector d64 ? Math.pow(10, d64.scale) : 0;
try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) {
for (int i = 0; i < rowCount; i++) {
if (listCol.noNulls == false && listCol.isNull[i]) {
Expand All @@ -566,7 +569,7 @@ private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) {
if (child.noNulls == false && child.isNull[idx]) {
builder.appendDouble(0.0);
} else {
builder.appendDouble(child.vector[idx]);
builder.appendDouble(readDoubleFrom(child, idx, d64ScaleFactor));
}
}
builder.endPositionEntry();
Expand All @@ -576,6 +579,17 @@ private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) {
}
}

private static double readDoubleFrom(ColumnVector vector, int idx, double d64ScaleFactor) {
if (vector instanceof DoubleColumnVector dv) {
return dv.vector[idx];
} else if (vector instanceof DecimalColumnVector decV) {
return decV.vector[idx].doubleValue();
} else if (vector instanceof Decimal64ColumnVector d64) {
return d64.vector[idx] / d64ScaleFactor;
}
throw new QlIllegalArgumentException("Unsupported list element type: " + vector.getClass().getSimpleName());
}

private Block createListBooleanBlock(ListColumnVector listCol, int rowCount) {
LongColumnVector child = (LongColumnVector) listCol.child;
try (var builder = blockFactory.newBooleanBlockBuilder(rowCount)) {
Expand Down Expand Up @@ -627,7 +641,9 @@ private Block createListDatetimeBlock(ListColumnVector listCol, int rowCount) {
millis = lv.vector[idx] * MILLIS_PER_DAY;
}
} else {
millis = 0L;
throw new QlIllegalArgumentException(
"Unsupported list child type for DATETIME: " + child.getClass().getSimpleName()
);
}
builder.appendLong(millis);
}
Expand All @@ -648,6 +664,11 @@ private Block createDoubleBlock(ColumnVector vector, int rowCount) {
doubleVector.isRepeating,
doubleVector.isNull
);
} else if (vector instanceof DecimalColumnVector decVector) {
return createDecimalDoubleBlock(decVector, rowCount);
} else if (vector instanceof Decimal64ColumnVector dec64Vector) {
// Decimal64ColumnVector extends LongColumnVector — must check before LongColumnVector
return createDecimal64DoubleBlock(dec64Vector, rowCount);
} else if (vector instanceof LongColumnVector longVector) {
return ColumnBlockConversions.doubleColumnFromLongs(
blockFactory,
Expand All @@ -661,6 +682,54 @@ private Block createDoubleBlock(ColumnVector vector, int rowCount) {
throw new QlIllegalArgumentException("Unsupported column type: " + vector.getClass().getSimpleName());
}

/**
* Converts a {@link DecimalColumnVector} (arbitrary precision) to a double block.
* Each element is a {@code HiveDecimalWritable} whose {@code doubleValue()} returns the
* properly scaled value. Precision loss beyond ~15 significant digits is inherent to double.
*/
private Block createDecimalDoubleBlock(DecimalColumnVector decVector, int rowCount) {
if (decVector.isRepeating) {
if (decVector.noNulls == false && decVector.isNull[0]) {
return blockFactory.newConstantNullBlock(rowCount);
}
return blockFactory.newConstantDoubleBlockWith(decVector.vector[0].doubleValue(), rowCount);
}
try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) {
for (int i = 0; i < rowCount; i++) {
if (decVector.noNulls == false && decVector.isNull[i]) {
builder.appendNull();
} else {
builder.appendDouble(decVector.vector[i].doubleValue());
}
}
return builder.build();
}
}

/**
* Converts a {@link Decimal64ColumnVector} (precision &le; 18) to a double block.
* Values are stored as unscaled longs; dividing by 10^scale recovers the decimal value.
*/
private Block createDecimal64DoubleBlock(Decimal64ColumnVector dec64Vector, int rowCount) {
double scaleFactor = Math.pow(10, dec64Vector.scale);
if (dec64Vector.isRepeating) {
if (dec64Vector.noNulls == false && dec64Vector.isNull[0]) {
return blockFactory.newConstantNullBlock(rowCount);
}
return blockFactory.newConstantDoubleBlockWith(dec64Vector.vector[0] / scaleFactor, rowCount);
}
try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) {
for (int i = 0; i < rowCount; i++) {
if (dec64Vector.noNulls == false && dec64Vector.isNull[i]) {
builder.appendNull();
} else {
builder.appendDouble(dec64Vector.vector[i] / scaleFactor);
}
}
return builder.build();
}
}

private Block createBytesRefBlock(ColumnVector vector, int rowCount) {
Check.isTrue(vector instanceof BytesColumnVector, "Unsupported column type: " + vector.getClass().getSimpleName());
BytesColumnVector bytesVector = (BytesColumnVector) vector;
Expand Down
Loading
Loading