diff --git a/docs/changelog/145074.yaml b/docs/changelog/145074.yaml new file mode 100644 index 0000000000000..ac4a22324276e --- /dev/null +++ b/docs/changelog/145074.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 145074 +summary: Fix ORC type support gaps +type: enhancement diff --git a/x-pack/plugin/esql-datasource-orc/src/main/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReader.java b/x-pack/plugin/esql-datasource-orc/src/main/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReader.java index ee0d992860f83..1691e1eb3ccb6 100644 --- a/x-pack/plugin/esql-datasource-orc/src/main/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReader.java +++ b/x-pack/plugin/esql-datasource-orc/src/main/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReader.java @@ -11,6 +11,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -118,11 +120,11 @@ public SourceMetadata metadata(StorageObject object) throws IOException { * {@code false} uses getMinimum/getMaximum which applies a local-timezone shift. Without * it, predicates against TIMESTAMP_INSTANT columns cause false stripe exclusions. *

- * This is safe because ESQL maps all date types to DATETIME using UTC epoch millis, - * and the ORC files use TIMESTAMP_INSTANT (UTC-anchored) columns. If we ever support - * files with plain TIMESTAMP columns (writer-local timezone), this flag would incorrectly - * treat their statistics as UTC too — at that point we'd need per-column evaluation by - * bypassing SearchArgument and reading stripe statistics directly (the Trino approach). + * This is safe because the ORC files use TIMESTAMP_INSTANT (UTC-anchored) columns. If we + * ever support files with plain TIMESTAMP columns (writer-local timezone), this flag would + * incorrectly treat their statistics as UTC too — at that point we'd need per-column + * evaluation by bypassing SearchArgument and reading stripe statistics directly (the Trino + * approach). */ private static OrcFile.ReaderOptions orcReaderOptions(OrcStorageObjectAdapter fs) { return OrcFile.readerOptions(new Configuration(false)).filesystem(fs).useUTCTimestamp(true); @@ -331,8 +333,8 @@ private static DataType convertOrcTypeToEsql(TypeDescription orcType) { case FLOAT, DOUBLE -> DataType.DOUBLE; case STRING -> DataType.TEXT; case VARCHAR, CHAR -> DataType.KEYWORD; - case BINARY -> DataType.KEYWORD; - case TIMESTAMP, TIMESTAMP_INSTANT, DATE -> DataType.DATETIME; + case TIMESTAMP, TIMESTAMP_INSTANT -> DataType.DATETIME; + case DATE -> DataType.DATETIME; case DECIMAL -> DataType.DOUBLE; case LIST -> convertOrcTypeToEsql(orcType.getChildren().get(0)); default -> DataType.UNSUPPORTED; @@ -552,7 +554,8 @@ private Block createListLongBlock(ListColumnVector listCol, int rowCount) { } private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) { - DoubleColumnVector child = (DoubleColumnVector) listCol.child; + ColumnVector child = listCol.child; + double d64ScaleFactor = child instanceof Decimal64ColumnVector d64 ? Math.pow(10, d64.scale) : 0; try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) { for (int i = 0; i < rowCount; i++) { if (listCol.noNulls == false && listCol.isNull[i]) { @@ -566,7 +569,7 @@ private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) { if (child.noNulls == false && child.isNull[idx]) { builder.appendDouble(0.0); } else { - builder.appendDouble(child.vector[idx]); + builder.appendDouble(readDoubleFrom(child, idx, d64ScaleFactor)); } } builder.endPositionEntry(); @@ -576,6 +579,17 @@ private Block createListDoubleBlock(ListColumnVector listCol, int rowCount) { } } + private static double readDoubleFrom(ColumnVector vector, int idx, double d64ScaleFactor) { + if (vector instanceof DoubleColumnVector dv) { + return dv.vector[idx]; + } else if (vector instanceof DecimalColumnVector decV) { + return decV.vector[idx].doubleValue(); + } else if (vector instanceof Decimal64ColumnVector d64) { + return d64.vector[idx] / d64ScaleFactor; + } + throw new QlIllegalArgumentException("Unsupported list element type: " + vector.getClass().getSimpleName()); + } + private Block createListBooleanBlock(ListColumnVector listCol, int rowCount) { LongColumnVector child = (LongColumnVector) listCol.child; try (var builder = blockFactory.newBooleanBlockBuilder(rowCount)) { @@ -627,7 +641,9 @@ private Block createListDatetimeBlock(ListColumnVector listCol, int rowCount) { millis = lv.vector[idx] * MILLIS_PER_DAY; } } else { - millis = 0L; + throw new QlIllegalArgumentException( + "Unsupported list child type for DATETIME: " + child.getClass().getSimpleName() + ); } builder.appendLong(millis); } @@ -648,6 +664,11 @@ private Block createDoubleBlock(ColumnVector vector, int rowCount) { doubleVector.isRepeating, doubleVector.isNull ); + } else if (vector instanceof DecimalColumnVector decVector) { + return createDecimalDoubleBlock(decVector, rowCount); + } else if (vector instanceof Decimal64ColumnVector dec64Vector) { + // Decimal64ColumnVector extends LongColumnVector — must check before LongColumnVector + return createDecimal64DoubleBlock(dec64Vector, rowCount); } else if (vector instanceof LongColumnVector longVector) { return ColumnBlockConversions.doubleColumnFromLongs( blockFactory, @@ -661,6 +682,54 @@ private Block createDoubleBlock(ColumnVector vector, int rowCount) { throw new QlIllegalArgumentException("Unsupported column type: " + vector.getClass().getSimpleName()); } + /** + * Converts a {@link DecimalColumnVector} (arbitrary precision) to a double block. + * Each element is a {@code HiveDecimalWritable} whose {@code doubleValue()} returns the + * properly scaled value. Precision loss beyond ~15 significant digits is inherent to double. + */ + private Block createDecimalDoubleBlock(DecimalColumnVector decVector, int rowCount) { + if (decVector.isRepeating) { + if (decVector.noNulls == false && decVector.isNull[0]) { + return blockFactory.newConstantNullBlock(rowCount); + } + return blockFactory.newConstantDoubleBlockWith(decVector.vector[0].doubleValue(), rowCount); + } + try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) { + for (int i = 0; i < rowCount; i++) { + if (decVector.noNulls == false && decVector.isNull[i]) { + builder.appendNull(); + } else { + builder.appendDouble(decVector.vector[i].doubleValue()); + } + } + return builder.build(); + } + } + + /** + * Converts a {@link Decimal64ColumnVector} (precision ≤ 18) to a double block. + * Values are stored as unscaled longs; dividing by 10^scale recovers the decimal value. + */ + private Block createDecimal64DoubleBlock(Decimal64ColumnVector dec64Vector, int rowCount) { + double scaleFactor = Math.pow(10, dec64Vector.scale); + if (dec64Vector.isRepeating) { + if (dec64Vector.noNulls == false && dec64Vector.isNull[0]) { + return blockFactory.newConstantNullBlock(rowCount); + } + return blockFactory.newConstantDoubleBlockWith(dec64Vector.vector[0] / scaleFactor, rowCount); + } + try (var builder = blockFactory.newDoubleBlockBuilder(rowCount)) { + for (int i = 0; i < rowCount; i++) { + if (dec64Vector.noNulls == false && dec64Vector.isNull[i]) { + builder.appendNull(); + } else { + builder.appendDouble(dec64Vector.vector[i] / scaleFactor); + } + } + return builder.build(); + } + } + private Block createBytesRefBlock(ColumnVector vector, int rowCount) { Check.isTrue(vector instanceof BytesColumnVector, "Unsupported column type: " + vector.getClass().getSimpleName()); BytesColumnVector bytesVector = (BytesColumnVector) vector; diff --git a/x-pack/plugin/esql-datasource-orc/src/test/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReaderTests.java b/x-pack/plugin/esql-datasource-orc/src/test/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReaderTests.java index b84ca6176ef53..a4e7747c439c2 100644 --- a/x-pack/plugin/esql-datasource-orc/src/test/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReaderTests.java +++ b/x-pack/plugin/esql-datasource-orc/src/test/java/org/elasticsearch/xpack/esql/datasource/orc/OrcFormatReaderTests.java @@ -12,6 +12,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -20,6 +21,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.lucene.util.BytesRef; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; @@ -490,6 +492,9 @@ public void testReadTimestampColumn() throws Exception { StorageObject storageObject = createStorageObject(orcData); OrcFormatReader reader = new OrcFormatReader(blockFactory); + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.DATETIME, metadata.schema().get(1).dataType()); + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { assertTrue(iterator.hasNext()); Page page = iterator.next(); @@ -760,6 +765,302 @@ public void testReadWithPushedFilterAndColumnProjection() throws Exception { } } + public void testTimestampTruncatesToMillisPrecision() throws Exception { + TypeDescription schema = TypeDescription.createStruct().addField("event_time", TypeDescription.createTimestampInstant()); + + long epochMillis = Instant.parse("2024-01-15T10:30:00.123Z").toEpochMilli(); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 2; + TimestampColumnVector tsCol = (TimestampColumnVector) batch.cols[0]; + + tsCol.time[0] = epochMillis; + tsCol.nanos[0] = 123_456_789; + + tsCol.time[1] = epochMillis; + tsCol.nanos[1] = 123_000_000; + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(2, page.getPositionCount()); + LongBlock tsBlock = (LongBlock) page.getBlock(0); + + assertEquals(epochMillis, tsBlock.getLong(0)); + assertEquals(epochMillis, tsBlock.getLong(1)); + } + } + + public void testPreEpochTimestamp() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("id", TypeDescription.createLong()) + .addField("birth_date", TypeDescription.createTimestampInstant()); + + long preEpochMillis = Instant.parse("1953-09-02T00:00:00Z").toEpochMilli(); + long postEpochMillis = Instant.parse("1986-06-26T00:00:00Z").toEpochMilli(); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 2; + LongColumnVector idCol = (LongColumnVector) batch.cols[0]; + TimestampColumnVector tsCol = (TimestampColumnVector) batch.cols[1]; + + idCol.vector[0] = 1L; + tsCol.time[0] = preEpochMillis; + tsCol.nanos[0] = 0; + + idCol.vector[1] = 2L; + tsCol.time[1] = postEpochMillis; + tsCol.nanos[1] = 0; + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.DATETIME, metadata.schema().get(1).dataType()); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(2, page.getPositionCount()); + + LongBlock tsBlock = (LongBlock) page.getBlock(1); + assertTrue("pre-epoch millis should be negative", tsBlock.getLong(0) < 0); + assertEquals(preEpochMillis, tsBlock.getLong(0)); + assertEquals(postEpochMillis, tsBlock.getLong(1)); + } + } + + public void testReadListTimestampColumn() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("id", TypeDescription.createLong()) + .addField("events", TypeDescription.createList(TypeDescription.createTimestampInstant())); + + long ts1 = Instant.parse("2024-01-15T10:00:00Z").toEpochMilli(); + long ts2 = Instant.parse("2024-01-15T11:00:00Z").toEpochMilli(); + long ts3 = Instant.parse("1965-03-20T08:30:00Z").toEpochMilli(); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 2; + LongColumnVector idCol = (LongColumnVector) batch.cols[0]; + ListColumnVector eventsCol = (ListColumnVector) batch.cols[1]; + TimestampColumnVector eventsChild = (TimestampColumnVector) eventsCol.child; + + eventsChild.ensureSize(3, false); + idCol.vector[0] = 1L; + eventsCol.offsets[0] = 0; + eventsCol.lengths[0] = 2; + eventsChild.time[0] = ts1; + eventsChild.nanos[0] = 0; + eventsChild.time[1] = ts2; + eventsChild.nanos[1] = 0; + + idCol.vector[1] = 2L; + eventsCol.offsets[1] = 2; + eventsCol.lengths[1] = 1; + eventsChild.time[2] = ts3; + eventsChild.nanos[2] = 0; + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.DATETIME, metadata.schema().get(1).dataType()); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(2, page.getPositionCount()); + + LongBlock eventsBlock = (LongBlock) page.getBlock(1); + assertEquals(2, eventsBlock.getValueCount(0)); + assertEquals(ts1, eventsBlock.getLong(0)); + assertEquals(ts2, eventsBlock.getLong(1)); + assertEquals(1, eventsBlock.getValueCount(1)); + assertTrue("pre-epoch list element should be negative", eventsBlock.getLong(2) < 0); + assertEquals(ts3, eventsBlock.getLong(2)); + } + } + + public void testBinaryMapsToUnsupported() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("id", TypeDescription.createLong()) + .addField("payload", TypeDescription.createBinary()); + + byte[] rawBytes = new byte[] { 0x00, 0x01, (byte) 0xFF, (byte) 0xFE, 0x42 }; + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 1; + ((LongColumnVector) batch.cols[0]).vector[0] = 1L; + ((BytesColumnVector) batch.cols[1]).setVal(0, rawBytes); + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.UNSUPPORTED, metadata.schema().get(1).dataType()); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(1, page.getPositionCount()); + assertEquals(1L, ((LongBlock) page.getBlock(0)).getLong(0)); + assertTrue(page.getBlock(1).isNull(0)); + } + } + + public void testReadDecimalColumn() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("id", TypeDescription.createLong()) + .addField("price", TypeDescription.createDecimal().withPrecision(10).withScale(2)); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 3; + LongColumnVector idCol = (LongColumnVector) batch.cols[0]; + DecimalColumnVector priceCol = (DecimalColumnVector) batch.cols[1]; + + idCol.vector[0] = 1L; + priceCol.set(0, new HiveDecimalWritable("123.45")); + + idCol.vector[1] = 2L; + priceCol.set(1, new HiveDecimalWritable("0.01")); + + idCol.vector[2] = 3L; + priceCol.set(2, new HiveDecimalWritable("99999.99")); + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.DOUBLE, metadata.schema().get(1).dataType()); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(3, page.getPositionCount()); + + DoubleBlock priceBlock = (DoubleBlock) page.getBlock(1); + assertEquals(123.45, priceBlock.getDouble(0), 0.001); + assertEquals(0.01, priceBlock.getDouble(1), 0.001); + assertEquals(99999.99, priceBlock.getDouble(2), 0.001); + } + } + + public void testReadDecimalColumnWithNulls() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("amount", TypeDescription.createDecimal().withPrecision(10).withScale(2)); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 3; + DecimalColumnVector amountCol = (DecimalColumnVector) batch.cols[0]; + + amountCol.set(0, new HiveDecimalWritable("42.50")); + + amountCol.noNulls = false; + amountCol.isNull[1] = true; + + amountCol.set(2, new HiveDecimalWritable("100.00")); + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(3, page.getPositionCount()); + + DoubleBlock block = (DoubleBlock) page.getBlock(0); + assertEquals(42.50, block.getDouble(0), 0.001); + assertTrue(block.isNull(1)); + assertEquals(100.00, block.getDouble(2), 0.001); + } + } + + public void testReadDecimalHighPrecision() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("value", TypeDescription.createDecimal().withPrecision(38).withScale(10)); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 2; + DecimalColumnVector valCol = (DecimalColumnVector) batch.cols[0]; + + valCol.set(0, new HiveDecimalWritable("1234567890.1234567890")); + valCol.set(1, new HiveDecimalWritable("-9876543210.0000000001")); + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(2, page.getPositionCount()); + + DoubleBlock block = (DoubleBlock) page.getBlock(0); + assertEquals(1234567890.1234567890, block.getDouble(0), 0.01); + assertEquals(-9876543210.0000000001, block.getDouble(1), 0.01); + } + } + + public void testReadListDecimalColumn() throws Exception { + TypeDescription schema = TypeDescription.createStruct() + .addField("id", TypeDescription.createLong()) + .addField("prices", TypeDescription.createList(TypeDescription.createDecimal().withPrecision(10).withScale(2))); + + byte[] orcData = createOrcFile(schema, batch -> { + batch.size = 2; + LongColumnVector idCol = (LongColumnVector) batch.cols[0]; + ListColumnVector pricesCol = (ListColumnVector) batch.cols[1]; + DecimalColumnVector pricesChild = (DecimalColumnVector) pricesCol.child; + + pricesChild.ensureSize(3, false); + idCol.vector[0] = 1L; + pricesCol.offsets[0] = 0; + pricesCol.lengths[0] = 2; + pricesChild.set(0, new HiveDecimalWritable("10.50")); + pricesChild.set(1, new HiveDecimalWritable("20.99")); + + idCol.vector[1] = 2L; + pricesCol.offsets[1] = 2; + pricesCol.lengths[1] = 1; + pricesChild.set(2, new HiveDecimalWritable("99.00")); + }); + + StorageObject storageObject = createStorageObject(orcData); + OrcFormatReader reader = new OrcFormatReader(blockFactory); + + SourceMetadata metadata = reader.metadata(storageObject); + assertEquals(DataType.DOUBLE, metadata.schema().get(1).dataType()); + + try (CloseableIterator iterator = reader.read(storageObject, null, 1024)) { + assertTrue(iterator.hasNext()); + Page page = iterator.next(); + + assertEquals(2, page.getPositionCount()); + + DoubleBlock pricesBlock = (DoubleBlock) page.getBlock(1); + assertEquals(2, pricesBlock.getValueCount(0)); + assertEquals(10.50, pricesBlock.getDouble(0), 0.001); + assertEquals(20.99, pricesBlock.getDouble(1), 0.001); + assertEquals(1, pricesBlock.getValueCount(1)); + assertEquals(99.00, pricesBlock.getDouble(2), 0.001); + } + } + @FunctionalInterface private interface BatchPopulator { void populate(VectorizedRowBatch batch); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec index 1c749392f645e..612e95f705a69 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec @@ -17,6 +17,19 @@ emp_no:integer | first_name:keyword | last_name:keyword | birth_date:date 10005 | "Kyoichi" | "Maliniak" | 1955-01-21T00:00:00.000Z | "M" | 1989-09-12T00:00:00.000Z | 1 | 2.05 | 63528 | true ; +filterByPreEpochBirthDate +required_capability: external_command + +EXTERNAL "{{employees}}" +| KEEP emp_no, first_name, last_name, birth_date, gender, hire_date, languages, height, salary, still_hired +| WHERE birth_date == TO_DATETIME("1953-09-02T00:00:00.000Z") +| SORT emp_no +| LIMIT 5; + +emp_no:integer | first_name:keyword | last_name:keyword | birth_date:date | gender:keyword | hire_date:date | languages:integer | height:double | salary:integer | still_hired:boolean +10001 | "Georgi" | "Facello" | 1953-09-02T00:00:00.000Z | "M" | 1986-06-26T00:00:00.000Z | 2 | 2.03 | 57305 | true +; + selectSpecificColumns required_capability: external_command