diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml
index b0a8fdd303246..ea74c6aed0f8c 100644
--- a/gradle/verification-metadata.xml
+++ b/gradle/verification-metadata.xml
@@ -551,6 +551,11 @@
+
+
+
+
+
@@ -1765,6 +1770,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java
index a99ed225b244b..34c45e432cd5e 100644
--- a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java
+++ b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java
@@ -29,7 +29,6 @@
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.security.AllPermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -728,13 +727,14 @@ public final BootstrapCheckResult check(BootstrapContext context) {
}
boolean isAllPermissionGranted() {
- final SecurityManager sm = System.getSecurityManager();
- assert sm != null;
- try {
- sm.checkPermission(new AllPermission());
- } catch (final SecurityException e) {
- return false;
- }
+ // final SecurityManager sm = System.getSecurityManager();
+ // assert sm != null;
+ // try {
+ // sm.checkPermission(new AllPermission());
+ // } catch (final SecurityException e) {
+ // return false;
+ // }
+ // NOCOMMIT disabled security manager
return true;
}
diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Security.java b/server/src/main/java/org/elasticsearch/bootstrap/Security.java
index eef7228bb4812..c2dda0a5e9d00 100644
--- a/server/src/main/java/org/elasticsearch/bootstrap/Security.java
+++ b/server/src/main/java/org/elasticsearch/bootstrap/Security.java
@@ -106,7 +106,7 @@ final class Security {
private Security() {}
static void setSecurityManager(@SuppressWarnings("removal") SecurityManager sm) {
- System.setSecurityManager(sm);
+ // System.setSecurityManager(sm); NOCOMMIT why can't I give myself the reflection permission I need?
}
/**
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java
index 80917b530202b..91423df1e9467 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java
@@ -32,7 +32,9 @@
public class RecyclerBytesStreamOutput extends BytesStream implements Releasable {
static final VarHandle VH_BE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
+ static final VarHandle VH_LE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN);
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
+ static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);
private final ArrayList> pages = new ArrayList<>();
private final Recycler recycler;
@@ -108,6 +110,17 @@ public void writeInt(int i) throws IOException {
}
}
+ @Override
+ public void writeIntLE(int i) throws IOException {
+ if (4 > (pageSize - currentPageOffset)) {
+ super.writeIntLE(i);
+ } else {
+ BytesRef currentPage = pages.get(pageIndex).v();
+ VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
+ currentPageOffset += 4;
+ }
+ }
+
@Override
public void writeLong(long i) throws IOException {
if (8 > (pageSize - currentPageOffset)) {
@@ -119,6 +132,17 @@ public void writeLong(long i) throws IOException {
}
}
+ @Override
+ public void writeLongLE(long i) throws IOException {
+ if (8 > (pageSize - currentPageOffset)) {
+ super.writeLongLE(i);
+ } else {
+ BytesRef currentPage = pages.get(pageIndex).v();
+ VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
+ currentPageOffset += 8;
+ }
+ }
+
@Override
public void writeWithSizePrefix(Writeable writeable) throws IOException {
// TODO: do this without copying the bytes from tmp by calling writeBytes and just use the pages in tmp directly through
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index 693d8efb18347..980b1866a6647 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -190,6 +190,15 @@ public void writeInt(int i) throws IOException {
writeBytes(buffer, 0, 4);
}
+ /**
+ * Writes an int as four bytes, least significant bytes first.
+ */
+ public void writeIntLE(int i) throws IOException {
+ final byte[] buffer = scratch.get();
+ ByteUtils.writeIntLE(i, buffer, 0);
+ writeBytes(buffer, 0, 4);
+ }
+
/**
* Writes an int in a variable-length format. Writes between one and
* five bytes. Smaller values take fewer bytes. Negative numbers
@@ -243,6 +252,15 @@ public void writeLong(long i) throws IOException {
writeBytes(buffer, 0, 8);
}
+ /**
+ * Writes a long as eight bytes.
+ */
+ public void writeLongLE(long i) throws IOException {
+ final byte[] buffer = scratch.get();
+ ByteUtils.writeLongLE(i, buffer, 0);
+ writeBytes(buffer, 0, 8);
+ }
+
/**
* Writes a non-negative long in a variable-length format. Writes between one and ten bytes. Smaller values take fewer bytes. Negative
* numbers use ten bytes and trip assertions (if running in tests) so prefer {@link #writeLong(long)} or {@link #writeZLong(long)} for
@@ -441,6 +459,10 @@ public void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
+ public void writeDoubleLE(double v) throws IOException {
+ writeLongLE(Double.doubleToLongBits(v));
+ }
+
public void writeOptionalDouble(@Nullable Double v) throws IOException {
if (v == null) {
writeBoolean(false);
diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java
index ae267573b4cab..a4f81fea52f6e 100644
--- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java
+++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java
@@ -9,6 +9,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.recycler.Recycler;
@@ -242,4 +243,53 @@ public void close() {
}
};
}
+
+ static ChunkedRestResponseBody fromMany(ChunkedRestResponseBody first, Iterator extends ChunkedRestResponseBody> rest) {
+ return new ChunkedRestResponseBody() {
+ private final String contentType = first.getResponseContentTypeString();
+ private ChunkedRestResponseBody current = first;
+
+ @Override
+ public boolean isDone() {
+ return current == null;
+ }
+
+ @Override
+ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException {
+ try {
+ return current.encodeChunk(sizeHint, recycler);
+ } finally {
+ if (current.isDone()) {
+ current.close();
+ if (false == rest.hasNext()) {
+ current = null;
+ } else {
+ current = rest.next();
+ if (false == contentType.equals(current.getResponseContentTypeString())) {
+ throw new IllegalArgumentException(
+ "content types much match but were ["
+ + contentType
+ + "] and ["
+ + current.getResponseContentTypeString()
+ + "]"
+ );
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getResponseContentTypeString() {
+ return contentType;
+ }
+
+ @Override
+ public void close() {
+ // Close all remaining portions
+ // NOCOMMIT why I need Iterators.map here? silly compiler, give me compile
+ Releasables.closeExpectNoException(current, Releasables.wrap(() -> Iterators.map(rest, r -> r)));
+ }
+ };
+ }
}
diff --git a/x-pack/plugin/esql/arrow/build.gradle b/x-pack/plugin/esql/arrow/build.gradle
new file mode 100644
index 0000000000000..09894078e1a11
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/build.gradle
@@ -0,0 +1,23 @@
+apply plugin: 'elasticsearch.build'
+
+dependencies {
+ implementation project('shim')
+ compileOnly project(':server')
+ compileOnly project(':x-pack:plugin:esql:compute')
+ implementation('org.apache.arrow:arrow-vector:15.0.0')
+ implementation('org.apache.arrow:arrow-format:15.0.0')
+ implementation('org.apache.arrow:arrow-memory-core:15.0.0')
+ implementation('com.google.flatbuffers:flatbuffers-java:23.5.26')
+ implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
+ implementation("com.fasterxml.jackson.core:jackson-core:${versions.jackson}")
+ implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") // This isn't really used - but it is loaded!
+ implementation("org.slf4j:slf4j-api:${versions.slf4j}")
+ runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}"
+
+ testImplementation project(':test:framework')
+ testImplementation('org.apache.arrow:arrow-memory-unsafe:15.0.0')
+}
+
+test {
+ jvmArgs('--add-opens=java.base/java.nio=ALL-UNNAMED')
+}
diff --git a/x-pack/plugin/esql/arrow/shim/build.gradle b/x-pack/plugin/esql/arrow/shim/build.gradle
new file mode 100644
index 0000000000000..da42aaa6b8eff
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/shim/build.gradle
@@ -0,0 +1,14 @@
+apply plugin: 'elasticsearch.build'
+
+dependencies {
+ implementation project(':libs:elasticsearch-logging')
+ implementation('org.apache.arrow:arrow-vector:15.0.0')
+ implementation('org.apache.arrow:arrow-format:15.0.0')
+ implementation('org.apache.arrow:arrow-memory-core:15.0.0')
+ implementation('com.google.flatbuffers:flatbuffers-java:23.5.26')
+ implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
+ implementation("com.fasterxml.jackson.core:jackson-core:${versions.jackson}")
+ implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") // This isn't really used - but it is loaded!
+ implementation("org.slf4j:slf4j-api:${versions.slf4j}")
+ runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}"
+}
diff --git a/x-pack/plugin/esql/arrow/shim/src/main/java/org/elasticsearch/xpack/esql/arrow/shim/Shim.java b/x-pack/plugin/esql/arrow/shim/src/main/java/org/elasticsearch/xpack/esql/arrow/shim/Shim.java
new file mode 100644
index 0000000000000..eaf1e6cbae775
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/shim/src/main/java/org/elasticsearch/xpack/esql/arrow/shim/Shim.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow.shim;
+
+import org.apache.arrow.memory.AllocationManager;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.DefaultAllocationManagerOption;
+import org.elasticsearch.logging.LogManager;
+
+import java.lang.reflect.Field;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * We don't actually use Arrow's memory manager, but , arrow
+ * won't initialize properly unless we configure one. We configure an "empty"
+ * one here.
+ */
+public class Shim implements AllocationManager.Factory {
+ /**
+ * Initialize the Arrow shim. Arrow does some interesting reflection stuff on
+ * initialization. We can avoid it if we
+ */
+ public static void init() {
+ try {
+ Class.forName("org.elasticsearch.test.ESTestCase");
+ LogManager.getLogger(Shim.class)
+ .info("we're in tests, disabling the arrow shim so we can use a real apache arrow runtime for testing");
+ } catch (ClassNotFoundException notfound) {
+ LogManager.getLogger(Shim.class).debug("shimming arrow's allocation manager");
+ AccessController.doPrivileged((PrivilegedAction) () -> {
+ try {
+ Field field = DefaultAllocationManagerOption.class.getDeclaredField("DEFAULT_ALLOCATION_MANAGER_FACTORY");
+ field.setAccessible(true);
+ field.set(null, new Shim());
+ } catch (Exception e) {
+ throw new AssertionError("can't init arrow", e);
+ }
+ return null;
+ });
+ }
+ }
+
+ @Override
+ public AllocationManager create(BufferAllocator accountingAllocator, long size) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ArrowBuf empty() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java
new file mode 100644
index 0000000000000..1baed055b50f4
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.elasticsearch.xcontent.MediaType;
+
+import java.util.Map;
+import java.util.Set;
+
+public class ArrowFormat implements MediaType {
+ public static final ArrowFormat INSTANCE = new ArrowFormat();
+
+ private static final String FORMAT = "arrow";
+ public static final String CONTENT_TYPE = "application/arrow";
+ private static final String VENDOR_CONTENT_TYPE = "application/vnd.elasticsearch+arrow";
+
+ @Override
+ public String queryParameter() {
+ return FORMAT;
+ }
+
+ @Override
+ public Set headerValues() {
+ return Set.of(
+ new HeaderValue(CONTENT_TYPE, Map.of("header", "present|absent")),
+ new HeaderValue(VENDOR_CONTENT_TYPE, Map.of("header", "present|absent", COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN))
+ );
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
new file mode 100644
index 0000000000000..4b80b0cf0facb
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
@@ -0,0 +1,457 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.common.recycler.Recycler;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.data.Vector;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
+import org.elasticsearch.xpack.esql.arrow.shim.Shim;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ArrowResponse implements Releasable {
+ public static class Column {
+ private final String esqlType;
+ private final FieldType arrowType;
+ private final String name;
+
+ public Column(String esqlType, String name) {
+ this.esqlType = esqlType;
+ this.arrowType = arrowFieldType(esqlType);
+ this.name = name;
+ }
+
+ String esqlType() {
+ return esqlType;
+ }
+
+ Field arrowField() {
+ return new Field(name, arrowType, List.of());
+ }
+ }
+
+ private final List columns;
+ private final List pages;
+ private final Runnable closeMe;
+
+ public ArrowResponse(List columns, List pages, Runnable closeMe) {
+ this.columns = columns;
+ this.pages = pages;
+ this.closeMe = closeMe;
+ }
+
+ List columns() {
+ return columns;
+ }
+
+ List pages() {
+ return pages;
+ }
+
+ @Override
+ public void close() {
+ closeMe.run();
+ }
+
+ public ChunkedRestResponseBody chunkedResponse() {
+ // TODO dictionaries
+
+ SchemaResponse schemaResponse = new SchemaResponse(this);
+ List rest = new ArrayList<>(pages.size());
+ for (int p = 0; p < pages.size(); p++) {
+ rest.add(new PageResponse(this, pages.get(p)));
+ }
+ rest.add(new EndResponse(this));
+
+ return ChunkedRestResponseBody.fromMany(schemaResponse, rest.iterator());
+ }
+
+ protected abstract static class AbstractArrowChunkedResponse implements ChunkedRestResponseBody {
+ static {
+ // Init the arrow shim
+ Shim.init();
+ }
+
+ protected final ArrowResponse response;
+
+ AbstractArrowChunkedResponse(ArrowResponse response) {
+ this.response = response;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public final ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException {
+ RecyclerBytesStreamOutput output = new RecyclerBytesStreamOutput(recycler);
+ try {
+ encodeChunk(sizeHint, output);
+ BytesReference ref = output.bytes();
+ RecyclerBytesStreamOutput closeRef = output;
+ output = null;
+ ReleasableBytesReference result = new ReleasableBytesReference(ref, () -> Releasables.closeExpectNoException(closeRef));
+ return result;
+ } catch (Exception e) {
+ logger.error("failed to write arrow chunk", e);
+ throw e;
+ } finally {
+ if (output != null) {
+ // assert false : "failed to write arrow chunk";
+ Releasables.closeExpectNoException(output);
+ }
+ }
+ }
+
+ protected abstract void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException;
+
+ /**
+ * Adapts our {@link RecyclerBytesStreamOutput} to the format that Arrow
+ * likes to write to.
+ */
+ protected static WritableByteChannel arrowOut(RecyclerBytesStreamOutput output) {
+ return new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer byteBuffer) throws IOException {
+ if (byteBuffer.hasArray() == false) {
+ throw new AssertionError("only implemented for array backed buffers");
+ }
+ int length = byteBuffer.remaining();
+ output.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), length);
+ byteBuffer.position(byteBuffer.position() + length);
+ assert byteBuffer.hasRemaining() == false;
+ return length;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public final String getResponseContentTypeString() {
+ return ArrowFormat.CONTENT_TYPE;
+ }
+ }
+
+ private static class SchemaResponse extends AbstractArrowChunkedResponse {
+ private boolean done = false;
+
+ SchemaResponse(ArrowResponse response) {
+ super(response);
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ WriteChannel arrowOut = new WriteChannel(arrowOut(out));
+ MessageSerializer.serialize(arrowOut, arrowSchema());
+ done = true;
+ }
+
+ private Schema arrowSchema() {
+ return new Schema(response.columns.stream().map(ArrowResponse.Column::arrowField).toList());
+ }
+ }
+
+ private static class PageResponse extends AbstractArrowChunkedResponse {
+ private final Page page;
+ private boolean done = false;
+
+ PageResponse(ArrowResponse response, Page page) {
+ super(response);
+ this.page = page;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ interface WriteBuf {
+ long write() throws IOException;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ List nodes = new ArrayList<>(page.getBlockCount());
+ List writeBufs = new ArrayList<>(page.getBlockCount() * 2);
+ List bufs = new ArrayList<>(page.getBlockCount() * 2);
+ WriteChannel arrowOut = new WriteChannel(arrowOut(out)) {
+ int bufIdx = 0;
+ long extraPosition = 0;
+
+ @Override
+ public void write(ArrowBuf buffer) throws IOException {
+ extraPosition += writeBufs.get(bufIdx++).write();
+ }
+
+ @Override
+ public long getCurrentPosition() {
+ return super.getCurrentPosition() + extraPosition;
+ }
+
+ @Override
+ public long align() throws IOException {
+ int trailingByteSize = (int) (getCurrentPosition() % 8);
+ if (trailingByteSize != 0) { // align on 8 byte boundaries
+ return writeZeros(8 - trailingByteSize);
+ }
+ return 0;
+ }
+ };
+ for (int b = 0; b < page.getBlockCount(); b++) {
+ accumulateBlock(out, nodes, bufs, writeBufs, page.getBlock(b));
+ }
+ ArrowRecordBatch batch = new ArrowRecordBatch(
+ page.getPositionCount(),
+ nodes,
+ bufs,
+ NoCompressionCodec.DEFAULT_BODY_COMPRESSION,
+ true,
+ false
+ );
+ MessageSerializer.serialize(arrowOut, batch);
+ done = true; // one day we should respect sizeHint here. kindness.
+ }
+
+ private static int validityCount(int totalValues) {
+ return (totalValues - 1) / Byte.SIZE + 1;
+ }
+
+ private void accumulateBlock(
+ RecyclerBytesStreamOutput out,
+ List nodes,
+ List bufs,
+ List writeBufs,
+ Block block
+ ) {
+ nodes.add(new ArrowFieldNode(block.getPositionCount(), block.nullValuesCount()));
+ switch (block.elementType()) {
+ case DOUBLE -> {
+ DoubleBlock b = (DoubleBlock) block;
+ DoubleVector v = b.asVector();
+ if (v != null) {
+ accumulateVectorValidity(out, bufs, writeBufs, v);
+ bufs.add(dummy().writerIndex(vectorLength(v)));
+ writeBufs.add(() -> writeVector(out, v));
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+ case INT -> {
+ IntBlock b = (IntBlock) block;
+ IntVector v = b.asVector();
+ if (v != null) {
+ accumulateVectorValidity(out, bufs, writeBufs, v);
+ bufs.add(dummy().writerIndex(vectorLength(v)));
+ writeBufs.add(() -> writeVector(out, v));
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+ case LONG -> {
+ LongBlock b = (LongBlock) block;
+ LongVector v = b.asVector();
+ if (v != null) {
+ accumulateVectorValidity(out, bufs, writeBufs, v);
+ bufs.add(dummy().writerIndex(vectorLength(v)));
+ writeBufs.add(() -> writeVector(out, v));
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+ case BYTES_REF -> {
+ BytesRefBlock b = (BytesRefBlock) block;
+ BytesRefVector v = b.asVector();
+ if (v != null) {
+ accumulateVectorValidity(out, bufs, writeBufs, v);
+ bufs.add(dummy().writerIndex(vectorOffsetLength(v)));
+ writeBufs.add(() -> writeVectorOffset(out, v));
+ bufs.add(dummy().writerIndex(vectorLength(v)));
+ writeBufs.add(() -> writeVector(out, v));
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+ default -> throw new UnsupportedOperationException();
+ }
+ }
+
+ private void accumulateVectorValidity(RecyclerBytesStreamOutput out, List bufs, List writeBufs, Vector v) {
+ bufs.add(dummy().writerIndex(validityCount(v.getPositionCount())));
+ writeBufs.add(() -> writeAllTrueValidity(out, v.getPositionCount()));
+ }
+
+ private long writeAllTrueValidity(RecyclerBytesStreamOutput out, int valueCount) {
+ int allOnesCount = valueCount / 8;
+ for (int i = 0; i < allOnesCount; i++) {
+ out.writeByte((byte) 0xff);
+ }
+ int remaining = valueCount % 8;
+ if (remaining == 0) {
+ return allOnesCount;
+ }
+ out.writeByte((byte) ((1 << remaining) - 1));
+ return allOnesCount + 1;
+ }
+
+ private long vectorLength(IntVector vector) {
+ return Integer.BYTES * vector.getPositionCount();
+ }
+
+ private long writeVector(RecyclerBytesStreamOutput out, IntVector vector) throws IOException {
+ // TODO could we "just" get the memory of the array and dump it?
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ out.writeIntLE(vector.getInt(i));
+ }
+ return vectorLength(vector);
+ }
+
+ private long vectorLength(LongVector vector) {
+ return Long.BYTES * vector.getPositionCount();
+ }
+
+ private long writeVector(RecyclerBytesStreamOutput out, LongVector vector) throws IOException {
+ // TODO could we "just" get the memory of the array and dump it?
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ out.writeLongLE(vector.getLong(i));
+ }
+ return vectorLength(vector);
+ }
+
+ private long vectorLength(DoubleVector vector) {
+ return Double.BYTES * vector.getPositionCount();
+ }
+
+ private long writeVector(RecyclerBytesStreamOutput out, DoubleVector vector) throws IOException {
+ // TODO could we "just" get the memory of the array and dump it?
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ out.writeDoubleLE(vector.getDouble(i));
+ }
+ return vectorLength(vector);
+ }
+
+ private long vectorOffsetLength(BytesRefVector vector) {
+ return Integer.BYTES * (vector.getPositionCount() + 1);
+ }
+
+ private long writeVectorOffset(RecyclerBytesStreamOutput out, BytesRefVector vector) throws IOException {
+ // TODO could we "just" get the memory of the array and dump it?
+ BytesRef scratch = new BytesRef();
+ int offset = 0;
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ out.writeIntLE(offset);
+ offset += vector.getBytesRef(i, scratch).length;
+ }
+ out.writeIntLE(offset);
+ return vectorOffsetLength(vector);
+ }
+
+ private long vectorLength(BytesRefVector vector) {
+ // TODO we can probably get the length from the vector without all this sum - it's in an array
+ long length = 0;
+ BytesRef scratch = new BytesRef();
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ length += vector.getBytesRef(i, scratch).length;
+ }
+ return length;
+ }
+
+ private long writeVector(RecyclerBytesStreamOutput out, BytesRefVector vector) throws IOException {
+ // TODO could we "just" get the memory of the array and dump it?
+ BytesRef scratch = new BytesRef();
+ long length = 0;
+ for (int i = 0; i < vector.getPositionCount(); i++) {
+ BytesRef v = vector.getBytesRef(i, scratch);
+ out.write(v.bytes, v.offset, v.length);
+ length += v.length;
+ }
+ return length;
+ }
+
+ private ArrowBuf dummy() {
+ return new ArrowBuf(null, null, 0, 0);
+ }
+ }
+
+ private class EndResponse extends AbstractArrowChunkedResponse {
+ private boolean done = false;
+
+ private EndResponse(ArrowResponse response) {
+ super(response);
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws IOException {
+ ArrowStreamWriter.writeEndOfStream(new WriteChannel(arrowOut(out)), IpcOption.DEFAULT);
+ done = true;
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(response);
+ }
+ }
+
+ static FieldType arrowFieldType(String fieldType) {
+ return switch (fieldType) {
+ case "date" -> FieldType.nullable(Types.MinorType.DATEMILLI.getType());
+ case "double" -> FieldType.nullable(Types.MinorType.FLOAT8.getType());
+ case "integer" -> FieldType.nullable(Types.MinorType.INT.getType());
+ case "long" -> FieldType.nullable(Types.MinorType.BIGINT.getType());
+ case "keyword", "text" -> FieldType.nullable(Types.MinorType.VARCHAR.getType());
+ default -> throw new UnsupportedOperationException("NOCOMMIT " + fieldType);
+ };
+ }
+}
diff --git a/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java b/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java
new file mode 100644
index 0000000000000..55ce926a1ed0c
--- /dev/null
+++ b/x-pack/plugin/esql/arrow/src/test/java/org/elasticsearch/xpack/esql/arrow/ArrowResponseTests.java
@@ -0,0 +1,408 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.arrow;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.Seed;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.BytesRefRecycler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.function.IntToDoubleFunction;
+import java.util.function.IntToLongFunction;
+import java.util.function.IntUnaryOperator;
+import java.util.function.Supplier;
+
+public class ArrowResponseTests extends ESTestCase {
+ @ParametersFactory
+ public static Iterable