+ * We don't actually use Arrow's memory manager as we stream dataframe buffers directly from ESQL blocks.
+ * But Arrow won't initialize properly unless it has one (and requires either the arrow-memory-netty or arrow-memory-unsafe libraries).
+ * It also does some fancy classpath scanning and calls to {@code setAccessible} which will be rejected by the security manager.
+ *
+ * So we configure an allocation manager that will fail on any attempt to allocate memory.
+ *
+ * @see DefaultAllocationManagerOption
+ */
+public class AllocationManagerShim implements AllocationManager.Factory {
+
+ private static final Logger logger = LogManager.getLogger(AllocationManagerShim.class);
+
+ /**
+ * Initialize the Arrow memory allocation manager shim.
+ */
+ @SuppressForbidden(reason = "Inject the default Arrow memory allocation manager")
+ public static void init() {
+ try {
+ Class.forName("org.elasticsearch.test.ESTestCase");
+ logger.info("We're in tests, not disabling Arrow memory manager so we can use a real runtime for testing");
+ } catch (ClassNotFoundException notfound) {
+ logger.debug("Disabling Arrow's allocation manager");
+ AccessController.doPrivileged((PrivilegedAction) () -> {
+ try {
+ Field field = DefaultAllocationManagerOption.class.getDeclaredField("DEFAULT_ALLOCATION_MANAGER_FACTORY");
+ field.setAccessible(true);
+ field.set(null, new AllocationManagerShim());
+ } catch (Exception e) {
+ throw new AssertionError("Can't init Arrow", e);
+ }
+ return null;
+ });
+ }
+ }
+
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long size) {
+ throw new UnsupportedOperationException("Arrow memory manager is disabled");
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ throw new UnsupportedOperationException("Arrow memory manager is disabled");
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java
new file mode 100644
index 0000000000000..762c95cdce3e7
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.elasticsearch.xcontent.MediaType;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ArrowFormat implements MediaType {
+ public static final ArrowFormat INSTANCE = new ArrowFormat();
+
+ private static final String FORMAT = "arrow";
+ // See https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.stream
+ public static final String CONTENT_TYPE = "application/vnd.apache.arrow.stream";
+ private static final String VENDOR_CONTENT_TYPE = "application/vnd.elasticsearch+arrow+stream";
+
+ @Override
+ public String queryParameter() {
+ return FORMAT;
+ }
+
+ @Override
+ public Set headerValues() {
+ return Set.of(
+ new HeaderValue(CONTENT_TYPE, Map.of("header", "present|absent")),
+ new HeaderValue(VENDOR_CONTENT_TYPE, Map.of("header", "present|absent", COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN))
+ );
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
new file mode 100644
index 0000000000000..8c2243284a538
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
@@ -0,0 +1,379 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.BytesStream;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.common.recycler.Recycler;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ArrowResponse implements ChunkedRestResponseBodyPart, Releasable {
+
+ public static class Column {
+ private final BlockConverter converter;
+ private final String name;
+
+ public Column(String esqlType, String name) {
+ this.converter = ESQL_CONVERTERS.get(esqlType);
+ if (converter == null) {
+ throw new IllegalArgumentException("ES|QL type [" + esqlType + "] is not supported by the Arrow format");
+ }
+ this.name = name;
+ }
+ }
+
+ private final List columns;
+ private Iterator segments;
+ private ResponseSegment currentSegment;
+
+ public ArrowResponse(List columns, List pages) {
+ this.columns = columns;
+
+ currentSegment = new SchemaResponse(this);
+ List rest = new ArrayList<>(pages.size());
+ for (int p = 0; p < pages.size(); p++) {
+ var page = pages.get(p);
+ rest.add(new PageResponse(this, page));
+ // Multivalued fields are not supported yet.
+ for (int b = 0; b < page.getBlockCount(); b++) {
+ if (page.getBlock(b).mayHaveMultivaluedFields()) {
+ throw new IllegalArgumentException(
+ "ES|QL response field [" + columns.get(b).name + "] is multi-valued. This isn't supported yet by the Arrow format"
+ );
+ }
+ }
+ }
+ rest.add(new EndResponse(this));
+ segments = rest.iterator();
+ }
+
+ @Override
+ public boolean isPartComplete() {
+ return currentSegment == null;
+ }
+
+ @Override
+ public boolean isLastPart() {
+ // Even if sent in chunks, the entirety of ESQL data is available, so it's single (chunked) part
+ return true;
+ }
+
+ @Override
+ public void getNextPart(ActionListener listener) {
+ listener.onFailure(new IllegalStateException("no continuations available"));
+ }
+
+ @Override
+ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException {
+ try {
+ return currentSegment.encodeChunk(sizeHint, recycler);
+ } finally {
+ if (currentSegment.isDone()) {
+ currentSegment = segments.hasNext() ? segments.next() : null;
+ }
+ }
+ }
+
+ @Override
+ public String getResponseContentTypeString() {
+ return ArrowFormat.CONTENT_TYPE;
+ }
+
+ @Override
+ public void close() {
+ currentSegment = null;
+ segments = null;
+ }
+
+ /**
+ * An Arrow response is composed of different segments, each being a set of chunks:
+ * the schema header, the data buffers, and the trailer.
+ */
+ protected abstract static class ResponseSegment {
+ static {
+ // Init the Arrow memory manager shim
+ AllocationManagerShim.init();
+ }
+
+ protected final ArrowResponse response;
+
+ ResponseSegment(ArrowResponse response) {
+ this.response = response;
+ }
+
+ public final ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException {
+ RecyclerBytesStreamOutput output = new RecyclerBytesStreamOutput(recycler);
+ try {
+ encodeChunk(sizeHint, output);
+ BytesReference ref = output.bytes();
+ RecyclerBytesStreamOutput closeRef = output;
+ output = null;
+ ReleasableBytesReference result = new ReleasableBytesReference(ref, () -> Releasables.closeExpectNoException(closeRef));
+ return result;
+ } finally {
+ Releasables.closeExpectNoException(output);
+ }
+ }
+
+ protected abstract void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException;
+
+ protected abstract boolean isDone();
+
+ /**
+ * Adapts a {@link BytesStream} so that Arrow can write to it.
+ */
+ protected static WritableByteChannel arrowOut(BytesStream output) {
+ return new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer byteBuffer) throws IOException {
+ if (byteBuffer.hasArray() == false) {
+ throw new AssertionError("only implemented for array backed buffers");
+ }
+ int length = byteBuffer.remaining();
+ output.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), length);
+ byteBuffer.position(byteBuffer.position() + length);
+ assert byteBuffer.hasRemaining() == false;
+ return length;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+ }
+
+ /**
+ * Header part of the Arrow response containing the dataframe schema.
+ *
+ * @see IPC Streaming Format
+ */
+ private static class SchemaResponse extends ResponseSegment {
+ private boolean done = false;
+
+ SchemaResponse(ArrowResponse response) {
+ super(response);
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ WriteChannel arrowOut = new WriteChannel(arrowOut(out));
+ MessageSerializer.serialize(arrowOut, arrowSchema());
+ done = true;
+ }
+
+ private Schema arrowSchema() {
+ return new Schema(response.columns.stream().map(c -> new Field(c.name, c.converter.arrowFieldType(), List.of())).toList());
+ }
+ }
+
+ /**
+ * Page response segment: write an ES|QL page as an Arrow RecordBatch
+ */
+ private static class PageResponse extends ResponseSegment {
+ private final Page page;
+ private boolean done = false;
+
+ PageResponse(ArrowResponse response, Page page) {
+ super(response);
+ this.page = page;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ // Writes some data and returns the number of bytes written.
+ interface BufWriter {
+ long write() throws IOException;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ // An Arrow record batch consists of:
+ // - fields metadata, giving the number of items and the number of null values for each field
+ // - data buffers for each field. The number of buffers for a field depends on its type, e.g.:
+ // - for primitive types, there's a validity buffer (for nulls) and a value buffer.
+ // - for strings, there's a validity buffer, an offsets buffer and a data buffer
+ // See https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
+
+ // Field metadata
+ List nodes = new ArrayList<>(page.getBlockCount());
+
+ // Buffers added to the record batch. They're used to track data size so that Arrow can compute offsets
+ // but contain no data. Actual writing will be done by the bufWriters. This avoids having to deal with
+ // Arrow's memory management, and in the future will allow direct write from ESQL block vectors.
+ List bufs = new ArrayList<>(page.getBlockCount() * 2);
+
+ // Closures that will actually write a Block's data. Maps 1:1 to `bufs`.
+ List bufWriters = new ArrayList<>(page.getBlockCount() * 2);
+
+ // Give Arrow a WriteChannel that will iterate on `bufWriters` when requested to write a buffer.
+ WriteChannel arrowOut = new WriteChannel(arrowOut(out)) {
+ int bufIdx = 0;
+ long extraPosition = 0;
+
+ @Override
+ public void write(ArrowBuf buffer) throws IOException {
+ extraPosition += bufWriters.get(bufIdx++).write(out);
+ }
+
+ @Override
+ public long getCurrentPosition() {
+ return super.getCurrentPosition() + extraPosition;
+ }
+
+ @Override
+ public long align() throws IOException {
+ int trailingByteSize = (int) (getCurrentPosition() % 8);
+ if (trailingByteSize != 0) { // align on 8 byte boundaries
+ return writeZeros(8 - trailingByteSize);
+ }
+ return 0;
+ }
+ };
+
+ // Create Arrow buffers for each of the blocks in this page
+ for (int b = 0; b < page.getBlockCount(); b++) {
+ var converter = response.columns.get(b).converter;
+
+ Block block = page.getBlock(b);
+ nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
+ converter.convert(block, bufs, bufWriters);
+ }
+
+ // Create the batch and serialize it
+ ArrowRecordBatch batch = new ArrowRecordBatch(
+ page.getPositionCount(),
+ nodes,
+ bufs,
+ NoCompressionCodec.DEFAULT_BODY_COMPRESSION,
+ true, // align buffers
+ false // retain buffers
+ );
+ MessageSerializer.serialize(arrowOut, batch);
+
+ done = true; // one day we should respect sizeHint here. kindness.
+ }
+ }
+
+ /**
+ * Trailer segment: write the Arrow end of stream marker
+ */
+ private static class EndResponse extends ResponseSegment {
+ private boolean done = false;
+
+ private EndResponse(ArrowResponse response) {
+ super(response);
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ ArrowStreamWriter.writeEndOfStream(new WriteChannel(arrowOut(out)), IpcOption.DEFAULT);
+ done = true;
+ }
+ }
+
+ /**
+ * Converters for every ES|QL type
+ */
+ static final Map ESQL_CONVERTERS = Map.ofEntries(
+ // For reference:
+ // - EsqlDataTypes: list of ESQL data types (not all are present in outputs)
+ // - PositionToXContent: conversions for ESQL JSON output
+ // - EsqlDataTypeConverter: conversions to ESQL datatypes
+ // Missing: multi-valued values
+
+ buildEntry(new BlockConverter.AsNull("null")),
+ buildEntry(new BlockConverter.AsNull("unsupported")),
+
+ buildEntry(new BlockConverter.AsBoolean("boolean")),
+
+ buildEntry(new BlockConverter.AsInt32("integer")),
+ buildEntry(new BlockConverter.AsInt32("counter_integer")),
+
+ buildEntry(new BlockConverter.AsInt64("long")),
+ // FIXME: counters: are they signed?
+ buildEntry(new BlockConverter.AsInt64("counter_long")),
+ buildEntry(new BlockConverter.AsInt64("unsigned_long", MinorType.UINT8)),
+
+ buildEntry(new BlockConverter.AsFloat64("double")),
+ buildEntry(new BlockConverter.AsFloat64("counter_double")),
+
+ buildEntry(new BlockConverter.AsVarChar("keyword")),
+ buildEntry(new BlockConverter.AsVarChar("text")),
+
+ // date: array of int64 seconds since epoch
+ // FIXME: is it signed?
+ buildEntry(new BlockConverter.AsInt64("date", MinorType.TIMESTAMPMILLI)),
+
+ // ip are represented as 16-byte ipv6 addresses. We shorten mapped ipv4 addresses to 4 bytes.
+ // Another option would be to use a fixed size binary to avoid the offset array. But with mostly
+ // ipv4 addresses it would still be twice as big.
+ buildEntry(new BlockConverter.TransformedBytesRef("ip", MinorType.VARBINARY, ValueConversions::shortenIpV4Addresses)),
+
+ // geo_point: Keep WKB format (JSON converts to WKT)
+ buildEntry(new BlockConverter.AsVarBinary("geo_point")),
+ buildEntry(new BlockConverter.AsVarBinary("geo_shape")),
+ buildEntry(new BlockConverter.AsVarBinary("cartesian_point")),
+ buildEntry(new BlockConverter.AsVarBinary("cartesian_shape")),
+
+ // version: convert to string
+ buildEntry(new BlockConverter.TransformedBytesRef("version", MinorType.VARCHAR, ValueConversions::versionToString)),
+
+ // _source: json
+ // TODO: support also CBOR and SMILE with an additional formatting parameter
+ buildEntry(new BlockConverter.TransformedBytesRef("_source", MinorType.VARCHAR, ValueConversions::sourceToJson))
+ );
+
+ private static Map.Entry buildEntry(BlockConverter converter) {
+ return Map.entry(converter.esqlType(), converter);
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
new file mode 100644
index 0000000000000..0a65792ab8e13
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+public abstract class BlockConverter {
+
+ private final FieldType fieldType;
+ private final String esqlType;
+
+ protected BlockConverter(String esqlType, Types.MinorType minorType) {
+ // Add the exact ESQL type as field metadata
+ var meta = Map.of("elastic:type", esqlType);
+ this.fieldType = new FieldType(true, minorType.getType(), null, meta);
+ this.esqlType = esqlType;
+ }
+
+ public final String esqlType() {
+ return this.esqlType;
+ }
+
+ public final FieldType arrowFieldType() {
+ return this.fieldType;
+ }
+
+ // Block.nullValuesCount was more efficient but was removed in https://github.com/elastic/elasticsearch/pull/108916
+ protected int nullValuesCount(Block block) {
+ if (block.mayHaveNulls() == false) {
+ return 0;
+ }
+
+ if (block.areAllValuesNull()) {
+ return block.getPositionCount();
+ }
+
+ int count = 0;
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public interface BufWriter {
+ long write(RecyclerBytesStreamOutput out) throws IOException;
+ }
+
+ /**
+ * Convert a block into Arrow buffers.
+ * @param block the ESQL block
+ * @param bufs arrow buffers, used to track sizes
+ * @param bufWriters buffer writers, that will do the actual work of writing the data
+ */
+ public abstract void convert(Block block, List bufs, List bufWriters);
+
+ /**
+ * Conversion of Double blocks
+ */
+ public static class AsFloat64 extends BlockConverter {
+
+ public AsFloat64(String esqlType) {
+ super(esqlType, Types.MinorType.FLOAT8);
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ DoubleBlock block = (DoubleBlock) b;
+
+ accumulateVectorValidity(bufs, bufWriters, block);
+
+ bufs.add(dummyArrowBuf(vectorLength(block)));
+ bufWriters.add(out -> {
+ if (block.areAllValuesNull()) {
+ return BlockConverter.writeZeroes(out, vectorLength(block));
+ }
+
+ // TODO could we "just" get the memory of the array and dump it?
+ int count = block.getPositionCount();
+ for (int i = 0; i < count; i++) {
+ out.writeDoubleLE(block.getDouble(i));
+ }
+ return vectorLength(block);
+ });
+ }
+
+ private static int vectorLength(DoubleBlock b) {
+ return Double.BYTES * b.getPositionCount();
+ }
+ }
+
+ /**
+ * Conversion of Int blocks
+ */
+ public static class AsInt32 extends BlockConverter {
+
+ public AsInt32(String esqlType) {
+ super(esqlType, Types.MinorType.INT);
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ IntBlock block = (IntBlock) b;
+
+ accumulateVectorValidity(bufs, bufWriters, block);
+
+ bufs.add(dummyArrowBuf(vectorLength(block)));
+ bufWriters.add(out -> {
+ if (block.areAllValuesNull()) {
+ return BlockConverter.writeZeroes(out, vectorLength(block));
+ }
+
+ // TODO could we "just" get the memory of the array and dump it?
+ int count = block.getPositionCount();
+ for (int i = 0; i < count; i++) {
+ out.writeIntLE(block.getInt(i));
+ }
+ return vectorLength(block);
+ });
+ }
+
+ private static int vectorLength(IntBlock b) {
+ return Integer.BYTES * b.getPositionCount();
+ }
+ }
+
+ /**
+ * Conversion of Long blocks
+ */
+ public static class AsInt64 extends BlockConverter {
+ public AsInt64(String esqlType) {
+ this(esqlType, Types.MinorType.BIGINT);
+ }
+
+ protected AsInt64(String esqlType, Types.MinorType minorType) {
+ super(esqlType, minorType);
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ LongBlock block = (LongBlock) b;
+ accumulateVectorValidity(bufs, bufWriters, block);
+
+ bufs.add(dummyArrowBuf(vectorLength(block)));
+ bufWriters.add(out -> {
+ if (block.areAllValuesNull()) {
+ return BlockConverter.writeZeroes(out, vectorLength(block));
+ }
+
+ // TODO could we "just" get the memory of the array and dump it?
+ int count = block.getPositionCount();
+ for (int i = 0; i < count; i++) {
+ out.writeLongLE(block.getLong(i));
+ }
+ return vectorLength(block);
+ });
+ }
+
+ private static int vectorLength(LongBlock b) {
+ return Long.BYTES * b.getPositionCount();
+ }
+ }
+
+ /**
+ * Conversion of Boolean blocks
+ */
+ public static class AsBoolean extends BlockConverter {
+ public AsBoolean(String esqlType) {
+ super(esqlType, Types.MinorType.BIT);
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ BooleanBlock block = (BooleanBlock) b;
+ accumulateVectorValidity(bufs, bufWriters, block);
+
+ bufs.add(dummyArrowBuf(vectorLength(block)));
+ bufWriters.add(out -> {
+ int count = block.getPositionCount();
+ BitSet bits = new BitSet();
+
+ // Only set the bits that are true, writeBitSet will take
+ // care of adding zero bytes if needed.
+ if (block.areAllValuesNull() == false) {
+ for (int i = 0; i < count; i++) {
+ if (block.getBoolean(i)) {
+ bits.set(i);
+ }
+ }
+ }
+
+ return BlockConverter.writeBitSet(out, bits, count);
+ });
+ }
+
+ private static int vectorLength(BooleanBlock b) {
+ return BlockConverter.bitSetLength(b.getPositionCount());
+ }
+ }
+
+ /**
+ * Conversion of ByteRef blocks
+ */
+ public static class BytesRefConverter extends BlockConverter {
+
+ public BytesRefConverter(String esqlType, Types.MinorType minorType) {
+ super(esqlType, minorType);
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ BytesRefBlock block = (BytesRefBlock) b;
+
+ BlockConverter.accumulateVectorValidity(bufs, bufWriters, block);
+
+ // Offsets vector
+ bufs.add(dummyArrowBuf(offsetVectorLength(block)));
+
+ bufWriters.add(out -> {
+ if (block.areAllValuesNull()) {
+ var count = block.getPositionCount() + 1;
+ for (int i = 0; i < count; i++) {
+ out.writeIntLE(0);
+ }
+ return offsetVectorLength(block);
+ }
+
+ // TODO could we "just" get the memory of the array and dump it?
+ BytesRef scratch = new BytesRef();
+ int offset = 0;
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ out.writeIntLE(offset);
+ // FIXME: add a ByteRefsVector.getLength(position): there are some cases
+ // where getBytesRef will allocate, which isn't needed here.
+ BytesRef v = block.getBytesRef(i, scratch);
+
+ offset += v.length;
+ }
+ out.writeIntLE(offset);
+ return offsetVectorLength(block);
+ });
+
+ // Data vector
+ bufs.add(BlockConverter.dummyArrowBuf(dataVectorLength(block)));
+
+ bufWriters.add(out -> {
+ if (block.areAllValuesNull()) {
+ return 0;
+ }
+
+ // TODO could we "just" get the memory of the array and dump it?
+ BytesRef scratch = new BytesRef();
+ long length = 0;
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ BytesRef v = block.getBytesRef(i, scratch);
+
+ out.write(v.bytes, v.offset, v.length);
+ length += v.length;
+ }
+ return length;
+ });
+ }
+
+ private static int offsetVectorLength(BytesRefBlock block) {
+ return Integer.BYTES * (block.getPositionCount() + 1);
+ }
+
+ private int dataVectorLength(BytesRefBlock block) {
+ if (block.areAllValuesNull()) {
+ return 0;
+ }
+
+ // TODO we can probably get the length from the vector without all this sum
+
+ int length = 0;
+ BytesRef scratch = new BytesRef();
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ BytesRef v = block.getBytesRef(i, scratch);
+ length += v.length;
+ }
+ return length;
+ }
+ }
+
+ /**
+ * Conversion of ByteRefs where each value is itself converted to a different format.
+ */
+ public static class TransformedBytesRef extends BytesRefConverter {
+
+ private final BiFunction valueConverter;
+
+ /**
+ *
+ * @param esqlType ESQL type name
+ * @param minorType Arrow type
+ * @param valueConverter a function that takes (value, scratch) input parameters and returns the transformed value
+ */
+ public TransformedBytesRef(String esqlType, Types.MinorType minorType, BiFunction valueConverter) {
+ super(esqlType, minorType);
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public void convert(Block b, List bufs, List bufWriters) {
+ BytesRefBlock block = (BytesRefBlock) b;
+ try (BytesRefBlock transformed = transformValues(block)) {
+ super.convert(transformed, bufs, bufWriters);
+ }
+ }
+
+ /**
+ * Creates a new BytesRefBlock by applying the value converter to each non null and non empty value
+ */
+ private BytesRefBlock transformValues(BytesRefBlock block) {
+ try (BytesRefBlock.Builder builder = block.blockFactory().newBytesRefBlockBuilder(block.getPositionCount())) {
+ BytesRef scratch = new BytesRef();
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ builder.appendNull();
+ } else {
+ BytesRef bytes = block.getBytesRef(i, scratch);
+ if (bytes.length != 0) {
+ bytes = valueConverter.apply(bytes, scratch);
+ }
+ builder.appendBytesRef(bytes);
+ }
+ }
+ return builder.build();
+ }
+ }
+ }
+
+ public static class AsVarChar extends BytesRefConverter {
+ public AsVarChar(String esqlType) {
+ super(esqlType, Types.MinorType.VARCHAR);
+ }
+ }
+
+ public static class AsVarBinary extends BytesRefConverter {
+ public AsVarBinary(String esqlType) {
+ super(esqlType, Types.MinorType.VARBINARY);
+ }
+ }
+
+ public static class AsNull extends BlockConverter {
+ public AsNull(String esqlType) {
+ super(esqlType, Types.MinorType.NULL);
+ }
+
+ @Override
+ public void convert(Block block, List bufs, List bufWriters) {
+ // Null vector in arrow has no associated buffers
+ // See https://arrow.apache.org/docs/format/Columnar.html#null-layout
+ }
+ }
+
+ // Create a dummy ArrowBuf used for size accounting purposes.
+ private static ArrowBuf dummyArrowBuf(long size) {
+ return new ArrowBuf(null, null, 0, 0).writerIndex(size);
+ }
+
+ // Length in bytes of a validity buffer
+ private static int bitSetLength(int totalValues) {
+ return (totalValues + 7) / 8;
+ }
+
+ private static void accumulateVectorValidity(List bufs, List bufWriters, Block b) {
+ bufs.add(dummyArrowBuf(bitSetLength(b.getPositionCount())));
+ bufWriters.add(out -> {
+ if (b.mayHaveNulls() == false) {
+ return writeAllTrueValidity(out, b.getPositionCount());
+ } else if (b.areAllValuesNull()) {
+ return writeAllFalseValidity(out, b.getPositionCount());
+ } else {
+ return writeValidities(out, b);
+ }
+ });
+ }
+
+ private static long writeAllTrueValidity(RecyclerBytesStreamOutput out, int valueCount) {
+ int allOnesCount = valueCount / 8;
+ for (int i = 0; i < allOnesCount; i++) {
+ out.writeByte((byte) 0xff);
+ }
+ int remaining = valueCount % 8;
+ if (remaining == 0) {
+ return allOnesCount;
+ }
+ out.writeByte((byte) ((1 << remaining) - 1));
+ return allOnesCount + 1;
+ }
+
+ private static long writeAllFalseValidity(RecyclerBytesStreamOutput out, int valueCount) {
+ int count = bitSetLength(valueCount);
+ for (int i = 0; i < count; i++) {
+ out.writeByte((byte) 0x00);
+ }
+ return count;
+ }
+
+ private static long writeValidities(RecyclerBytesStreamOutput out, Block block) {
+ int valueCount = block.getPositionCount();
+ BitSet bits = new BitSet(valueCount);
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i) == false) {
+ bits.set(i);
+ }
+ }
+ return writeBitSet(out, bits, valueCount);
+ }
+
+ private static long writeBitSet(RecyclerBytesStreamOutput out, BitSet bits, int bitCount) {
+ byte[] bytes = bits.toByteArray();
+ out.writeBytes(bytes, 0, bytes.length);
+
+ // toByteArray will return bytes up to the last bit set. It may therefore
+ // have a length lower than what is needed to actually store bitCount bits.
+ int expectedLength = bitSetLength(bitCount);
+ writeZeroes(out, expectedLength - bytes.length);
+
+ return expectedLength;
+ }
+
+ private static long writeZeroes(RecyclerBytesStreamOutput out, int byteCount) {
+ for (int i = 0; i < byteCount; i++) {
+ out.writeByte((byte) 0);
+ }
+ return byteCount;
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ValueConversions.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ValueConversions.java
new file mode 100644
index 0000000000000..8139380aef1c8
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ValueConversions.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.versionfield.Version;
+
+import java.io.IOException;
+
+/**
+ * Utilities to convert some of byte-encoded ESQL values into to a format more suitable
+ * for Arrow output.
+ */
+public class ValueConversions {
+
+ /**
+ * Shorten ipv6-mapped ipv4 IP addresses to 4 bytes
+ */
+ public static BytesRef shortenIpV4Addresses(BytesRef value, BytesRef scratch) {
+ // Same logic as sun.net.util.IPAddressUtil#isIPv4MappedAddress
+ // See https://datatracker.ietf.org/doc/html/rfc4291#section-2.5.5.2
+ if (value.length == 16) {
+ int pos = value.offset;
+ byte[] bytes = value.bytes;
+ boolean isIpV4 = bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == 0
+ && bytes[pos++] == (byte) 0xFF
+ && bytes[pos] == (byte) 0xFF;
+
+ if (isIpV4) {
+ scratch.bytes = value.bytes;
+ scratch.offset = value.offset + 12;
+ scratch.length = 4;
+ return scratch;
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Convert binary-encoded versions to strings
+ */
+ public static BytesRef versionToString(BytesRef value, BytesRef scratch) {
+ return new BytesRef(new Version(value).toString());
+ }
+
+ /**
+ * Convert any xcontent source to json
+ */
+ public static BytesRef sourceToJson(BytesRef value, BytesRef scratch) {
+ try {
+ var valueArray = new BytesArray(value);
+ XContentType xContentType = XContentHelper.xContentType(valueArray);
+ if (xContentType == XContentType.JSON) {
+ return value;
+ } else {
+ String json = XContentHelper.convertToJson(valueArray, false, xContentType);
+ return new BytesRef(json);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java b/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java
new file mode 100644
index 0000000000000..cf49b37db2805
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java
@@ -0,0 +1,600 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.UInt8Vector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.util.VectorSchemaRootAppender;
+import org.apache.lucene.document.InetAddressPoint;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.TriFunction;
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVectorBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.BytesRefRecycler;
+import org.elasticsearch.xpack.versionfield.Version;
+import org.junit.AfterClass;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+public class ArrowResponseTests extends ESTestCase {
+
+ private static final BlockFactory BLOCK_FACTORY = BlockFactory.getInstance(
+ new NoopCircuitBreaker("test-noop"),
+ BigArrays.NON_RECYCLING_INSTANCE
+ );
+
+ private static final RootAllocator ALLOCATOR = new RootAllocator();
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ ALLOCATOR.close();
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Value creation, getters for ESQL and Arrow
+
+ static final ValueType INTEGER_VALUES = new ValueTypeImpl(
+ factory -> factory.newIntBlockBuilder(0),
+ block -> block.appendInt(randomInt()),
+ (block, i, scratch) -> block.getInt(i),
+ IntVector::get
+ );
+
+ static final ValueType LONG_VALUES = new ValueTypeImpl(
+ factory -> factory.newLongBlockBuilder(0),
+ block -> block.appendLong(randomLong()),
+ (block, i, scratch) -> block.getLong(i),
+ BigIntVector::get
+ );
+
+ static final ValueType ULONG_VALUES = new ValueTypeImpl(
+ factory -> factory.newLongBlockBuilder(0),
+ block -> block.appendLong(randomLong()),
+ (block, i, scratch) -> block.getLong(i),
+ UInt8Vector::get
+ );
+
+ static final ValueType DATE_VALUES = new ValueTypeImpl(
+ factory -> factory.newLongBlockBuilder(0),
+ block -> block.appendLong(randomLong()),
+ (block, i, scratch) -> block.getLong(i),
+ TimeStampMilliVector::get
+ );
+
+ static final ValueType DOUBLE_VALUES = new ValueTypeImpl(
+ factory -> factory.newDoubleBlockBuilder(0),
+ block -> block.appendDouble(randomDouble()),
+ (block, i, scratch) -> block.getDouble(i),
+ Float8Vector::get
+ );
+
+ static final ValueType BOOLEAN_VALUES = new ValueTypeImpl(
+ factory -> factory.newBooleanBlockBuilder(0),
+ block -> block.appendBoolean(randomBoolean()),
+ (b, i, s) -> b.getBoolean(i),
+ (v, i) -> v.get(i) != 0 // Arrow's BitVector returns 0 or 1
+ );
+
+ static final ValueType TEXT_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ block -> block.appendBytesRef(new BytesRef("🚀" + randomAlphaOfLengthBetween(1, 20))),
+ (b, i, s) -> b.getBytesRef(i, s).utf8ToString(),
+ (v, i) -> new String(v.get(i), StandardCharsets.UTF_8)
+ );
+
+ static final ValueType SOURCE_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ // Use a constant value, conversion is tested separately
+ block -> block.appendBytesRef(new BytesRef("{\"foo\": 42}")),
+ (b, i, s) -> b.getBytesRef(i, s).utf8ToString(),
+ (v, i) -> new String(v.get(i), StandardCharsets.UTF_8)
+ );
+
+ static final ValueType IP_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ block -> {
+ byte[] addr = InetAddressPoint.encode(randomIp(randomBoolean()));
+ assertEquals(16, addr.length); // Make sure all is ipv6-mapped
+ block.appendBytesRef(new BytesRef(addr));
+ },
+ (b, i, s) -> ValueConversions.shortenIpV4Addresses(b.getBytesRef(i, s), new BytesRef()),
+ (v, i) -> new BytesRef(v.get(i))
+ );
+
+ static final ValueType BINARY_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ block -> block.appendBytesRef(new BytesRef(randomByteArrayOfLength(randomIntBetween(1, 100)))),
+ BytesRefBlock::getBytesRef,
+ (v, i) -> new BytesRef(v.get(i))
+ );
+
+ static final ValueType VERSION_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ block -> block.appendBytesRef(new Version(between(0, 100) + "." + between(0, 100) + "." + between(0, 100)).toBytesRef()),
+ (b, i, s) -> new Version(b.getBytesRef(i, s)).toString(),
+ (v, i) -> new String(v.get(i), StandardCharsets.UTF_8)
+ );
+
+ static final ValueType NULL_VALUES = new ValueTypeImpl(
+ factory -> factory.newBytesRefBlockBuilder(0),
+ Block.Builder::appendNull,
+ (b, i, s) -> b.isNull(i) ? null : "non-null in block",
+ (v, i) -> v.isNull(i) ? null : "non-null in vector"
+ );
+
+ static final Map VALUE_TYPES = Map.ofEntries(
+ Map.entry("integer", INTEGER_VALUES),
+ Map.entry("counter_integer", INTEGER_VALUES),
+ Map.entry("long", LONG_VALUES),
+ Map.entry("counter_long", LONG_VALUES),
+ Map.entry("unsigned_long", ULONG_VALUES),
+ Map.entry("double", DOUBLE_VALUES),
+ Map.entry("counter_double", DOUBLE_VALUES),
+
+ Map.entry("text", TEXT_VALUES),
+ Map.entry("keyword", TEXT_VALUES),
+
+ Map.entry("boolean", BOOLEAN_VALUES),
+ Map.entry("date", DATE_VALUES),
+ Map.entry("ip", IP_VALUES),
+ Map.entry("version", VERSION_VALUES),
+ Map.entry("_source", SOURCE_VALUES),
+
+ Map.entry("null", NULL_VALUES),
+ Map.entry("unsupported", NULL_VALUES),
+
+ // All geo types just pass-through WKB, use random binary data
+ Map.entry("geo_point", BINARY_VALUES),
+ Map.entry("geo_shape", BINARY_VALUES),
+ Map.entry("cartesian_point", BINARY_VALUES),
+ Map.entry("cartesian_shape", BINARY_VALUES)
+ );
+
+ // ---------------------------------------------------------------------------------------------
+ // Tests
+
+ public void testTestHarness() {
+ TestColumn testColumn = TestColumn.create("foo", "integer");
+ TestBlock denseBlock = TestBlock.create(BLOCK_FACTORY, testColumn, Density.Dense, 3);
+ TestBlock sparseBlock = TestBlock.create(BLOCK_FACTORY, testColumn, Density.Sparse, 5);
+ TestBlock emptyBlock = TestBlock.create(BLOCK_FACTORY, testColumn, Density.Empty, 7);
+
+ // Test that density works as expected
+ assertTrue(denseBlock.block instanceof IntVectorBlock);
+ assertEquals("IntArrayBlock", sparseBlock.block.getClass().getSimpleName()); // non-public class
+ assertEquals("ConstantNullBlock", emptyBlock.block.getClass().getSimpleName());
+
+ // Test that values iterator scans all pages
+ List pages = Stream.of(denseBlock, sparseBlock, emptyBlock).map(b -> new TestPage(List.of(b))).toList();
+ TestCase tc = new TestCase(List.of(testColumn), pages);
+ EsqlValuesIterator valuesIterator = new EsqlValuesIterator(tc, 0);
+ int count = 0;
+ while (valuesIterator.hasNext()) {
+ valuesIterator.next();
+ count++;
+ }
+ assertEquals(3 + 5 + 7, count);
+
+ // Test that we have value types for all types
+ List converters = new ArrayList<>(ArrowResponse.ESQL_CONVERTERS.keySet());
+ Collections.sort(converters);
+ List valueTypes = new ArrayList<>(VALUE_TYPES.keySet());
+ Collections.sort(valueTypes);
+ assertEquals("Missing test value types", converters, valueTypes);
+ }
+
+ /**
+ * Test single-column for all types with a mix of dense/sparse/empty pages
+ */
+ public void testSingleColumn() throws IOException {
+ for (var type : VALUE_TYPES.keySet()) {
+ TestColumn testColumn = new TestColumn("foo", type, VALUE_TYPES.get(type));
+ List pages = new ArrayList<>();
+
+ for (var density : Density.values()) {
+ TestBlock testBlock = TestBlock.create(BLOCK_FACTORY, testColumn, density, 10);
+ TestPage testPage = new TestPage(List.of(testBlock));
+ pages.add(testPage);
+ }
+ TestCase testCase = new TestCase(List.of(testColumn), pages);
+
+ compareEsqlAndArrow(testCase);
+ }
+ }
+
+ public void testSingleBlock() throws IOException {
+ // Simple test to easily focus on a specific type & density
+ String type = "text";
+ Density density = Density.Dense;
+
+ TestColumn testColumn = new TestColumn("foo", type, VALUE_TYPES.get(type));
+ List pages = new ArrayList<>();
+
+ TestBlock testBlock = TestBlock.create(BLOCK_FACTORY, testColumn, density, 10);
+ TestPage testPage = new TestPage(List.of(testBlock));
+ pages.add(testPage);
+
+ TestCase testCase = new TestCase(List.of(testColumn), pages);
+
+ compareEsqlAndArrow(testCase);
+ }
+
+ /**
+ * Test that multivalued arrays are rejected
+ */
+ public void testMultivaluedField() throws IOException {
+ IntBlock.Builder builder = BLOCK_FACTORY.newIntBlockBuilder(0);
+ builder.appendInt(42);
+ builder.appendNull();
+ builder.beginPositionEntry();
+ builder.appendInt(44);
+ builder.appendInt(45);
+ builder.endPositionEntry();
+ builder.appendInt(46);
+ IntBlock block = builder.build();
+
+ // Consistency check
+ assertTrue(block.mayHaveMultivaluedFields());
+ assertEquals(0, block.getFirstValueIndex(0));
+ assertEquals(1, block.getValueCount(0));
+
+ // null values still use one position in the array
+ assertEquals(0, block.getValueCount(1));
+ assertEquals(1, block.getFirstValueIndex(1));
+ assertTrue(block.isNull(1));
+ assertEquals(0, block.getInt(1));
+
+ assertEquals(2, block.getFirstValueIndex(2));
+ assertEquals(2, block.getValueCount(2));
+ assertEquals(2, block.getFirstValueIndex(2));
+ assertEquals(45, block.getInt(block.getFirstValueIndex(2) + 1));
+
+ assertEquals(4, block.getFirstValueIndex(3));
+
+ var column = TestColumn.create("some-field", "integer");
+ TestCase testCase = new TestCase(List.of(column), List.of(new TestPage(List.of(new TestBlock(column, block, Density.Dense)))));
+
+ IllegalArgumentException exc = assertThrows(IllegalArgumentException.class, () -> compareEsqlAndArrow(testCase));
+
+ assertEquals("ES|QL response field [some-field] is multi-valued. This isn't supported yet by the Arrow format", exc.getMessage());
+
+ }
+
+ /**
+ * Test a random set of types/columns/pages/densities
+ */
+ public void testRandomTypesAndSize() throws IOException {
+
+ // Shuffle types to randomize their succession in the Arrow stream
+ List types = new ArrayList<>(VALUE_TYPES.keySet());
+ Collections.shuffle(types, random());
+
+ List columns = types.stream().map(type -> TestColumn.create("col-" + type, type)).toList();
+
+ List pages = IntStream
+ // 1 to 10 pages of random density and 1 to 1000 values
+ .range(0, randomIntBetween(1, 100))
+ .mapToObj(i -> TestPage.create(BLOCK_FACTORY, columns))
+ .toList();
+
+ TestCase testCase = new TestCase(columns, pages);
+ // System.out.println(testCase);
+ // for (TestPage page: pages) {
+ // System.out.println(page);
+ // }
+
+ compareEsqlAndArrow(testCase);
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Test harness
+
+ private void compareEsqlAndArrow(TestCase testCase) throws IOException {
+ try (VectorSchemaRoot arrowVectors = toArrowVectors(testCase)) {
+ compareEsqlAndArrow(testCase, arrowVectors);
+ }
+ }
+
+ private void compareEsqlAndArrow(TestCase testCase, VectorSchemaRoot root) {
+ for (int i = 0; i < testCase.columns.size(); i++) {
+
+ // Check esql type in the metadata
+ var metadata = root.getSchema().getFields().get(i).getMetadata();
+ assertEquals(testCase.columns.get(i).type, metadata.get("elastic:type"));
+
+ // Check values
+ var esqlValuesIterator = new EsqlValuesIterator(testCase, i);
+ var arrowValuesIterator = new ArrowValuesIterator(testCase, root, i);
+
+ while (esqlValuesIterator.hasNext() && arrowValuesIterator.hasNext()) {
+ assertEquals(esqlValuesIterator.next(), arrowValuesIterator.next());
+ }
+
+ // Make sure we entirely consumed both sides.
+ assertFalse(esqlValuesIterator.hasNext());
+ assertFalse(arrowValuesIterator.hasNext());
+ }
+ }
+
+ private VectorSchemaRoot toArrowVectors(TestCase testCase) throws IOException {
+ ArrowResponse response = new ArrowResponse(
+ testCase.columns.stream().map(c -> new ArrowResponse.Column(c.type, c.name)).toList(),
+ testCase.pages.stream().map(p -> new Page(p.blocks.stream().map(b -> b.block).toArray(Block[]::new))).toList()
+ );
+
+ assertEquals("application/vnd.apache.arrow.stream", response.getResponseContentTypeString());
+
+ BytesReference bytes = serializeBlocksDirectly(response);
+ try (
+ ArrowStreamReader reader = new ArrowStreamReader(bytes.streamInput(), ALLOCATOR);
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ ) {
+ VectorSchemaRoot root = VectorSchemaRoot.create(readerRoot.getSchema(), ALLOCATOR);
+ root.allocateNew();
+
+ while (reader.loadNextBatch()) {
+ VectorSchemaRootAppender.append(root, readerRoot);
+ }
+
+ return root;
+ }
+ }
+
+ /**
+ * An iterator over values of a column across all pages.
+ */
+ static class EsqlValuesIterator implements Iterator