-
Notifications
You must be signed in to change notification settings - Fork 25.9k
Apache arrow support for ES|QL #104877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apache arrow support for ES|QL #104877
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.collect.Iterators; | ||
| import org.elasticsearch.common.io.stream.BytesStream; | ||
| import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; | ||
| import org.elasticsearch.common.recycler.Recycler; | ||
|
|
@@ -242,4 +243,53 @@ public void close() { | |
| } | ||
| }; | ||
| } | ||
|
|
||
| static ChunkedRestResponseBody fromMany(ChunkedRestResponseBody first, Iterator<? extends ChunkedRestResponseBody> rest) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This'll want javadoc. But I think it's generally useful. |
||
| return new ChunkedRestResponseBody() { | ||
| private final String contentType = first.getResponseContentTypeString(); | ||
| private ChunkedRestResponseBody current = first; | ||
|
|
||
| @Override | ||
| public boolean isDone() { | ||
| return current == null; | ||
| } | ||
|
|
||
| @Override | ||
| public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException { | ||
| try { | ||
| return current.encodeChunk(sizeHint, recycler); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't double checked this, but I'm worried that this'll send a chunk over the wire no matter how big the reference is. If it's less than the sizeHint we should probably give the next one a chance. |
||
| } finally { | ||
| if (current.isDone()) { | ||
| current.close(); | ||
| if (false == rest.hasNext()) { | ||
| current = null; | ||
| } else { | ||
| current = rest.next(); | ||
| if (false == contentType.equals(current.getResponseContentTypeString())) { | ||
| throw new IllegalArgumentException( | ||
| "content types much match but were [" | ||
| + contentType | ||
| + "] and [" | ||
| + current.getResponseContentTypeString() | ||
| + "]" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String getResponseContentTypeString() { | ||
| return contentType; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| // Close all remaining portions | ||
| // NOCOMMIT why I need Iterators.map here? silly compiler, give me compile | ||
| Releasables.closeExpectNoException(current, Releasables.wrap(() -> Iterators.map(rest, r -> r))); | ||
| } | ||
| }; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| apply plugin: 'elasticsearch.build' | ||
|
|
||
| dependencies { | ||
| implementation project('shim') | ||
| compileOnly project(':server') | ||
| compileOnly project(':x-pack:plugin:esql:compute') | ||
| implementation('org.apache.arrow:arrow-vector:15.0.0') | ||
| implementation('org.apache.arrow:arrow-format:15.0.0') | ||
| implementation('org.apache.arrow:arrow-memory-core:15.0.0') | ||
| implementation('com.google.flatbuffers:flatbuffers-java:23.5.26') | ||
| implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") | ||
| implementation("com.fasterxml.jackson.core:jackson-core:${versions.jackson}") | ||
| implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") // This isn't really used - but it is loaded! | ||
| implementation("org.slf4j:slf4j-api:${versions.slf4j}") | ||
| runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}" | ||
|
|
||
| testImplementation project(':test:framework') | ||
| testImplementation('org.apache.arrow:arrow-memory-unsafe:15.0.0') | ||
| } | ||
|
|
||
| test { | ||
| jvmArgs('--add-opens=java.base/java.nio=ALL-UNNAMED') | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| apply plugin: 'elasticsearch.build' | ||
|
|
||
| dependencies { | ||
| implementation project(':libs:elasticsearch-logging') | ||
| implementation('org.apache.arrow:arrow-vector:15.0.0') | ||
| implementation('org.apache.arrow:arrow-format:15.0.0') | ||
| implementation('org.apache.arrow:arrow-memory-core:15.0.0') | ||
| implementation('com.google.flatbuffers:flatbuffers-java:23.5.26') | ||
| implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") | ||
| implementation("com.fasterxml.jackson.core:jackson-core:${versions.jackson}") | ||
| implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}") // This isn't really used - but it is loaded! | ||
| implementation("org.slf4j:slf4j-api:${versions.slf4j}") | ||
| runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}" | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.esql.arrow.shim; | ||
|
|
||
| import org.apache.arrow.memory.AllocationManager; | ||
| import org.apache.arrow.memory.ArrowBuf; | ||
| import org.apache.arrow.memory.BufferAllocator; | ||
| import org.apache.arrow.memory.DefaultAllocationManagerOption; | ||
| import org.elasticsearch.logging.LogManager; | ||
|
|
||
| import java.lang.reflect.Field; | ||
| import java.security.AccessController; | ||
| import java.security.PrivilegedAction; | ||
|
|
||
| /** | ||
| * We don't actually <strong>use</strong> Arrow's memory manager, but , arrow | ||
| * won't initialize properly unless we configure one. We configure an "empty" | ||
| * one here. | ||
| */ | ||
| public class Shim implements AllocationManager.Factory { | ||
| /** | ||
| * Initialize the Arrow shim. Arrow does some interesting reflection stuff on | ||
| * initialization. We can avoid it if we | ||
| */ | ||
| public static void init() { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arrow has some "interesting" code in the initialization. In an effort to be magic it looks around the in classpath and calls We can avoid the whole classpath scanning and reflection magic with some contained magic of our own - this stuff. But we need security manager privileges. So I moved this to it's own tiny jar. Lastly, this replaces all the things arrow does with the Unsafe with a shim that does nothing. That's fine for how we're using it. |
||
| try { | ||
| Class.forName("org.elasticsearch.test.ESTestCase"); | ||
| LogManager.getLogger(Shim.class) | ||
| .info("we're in tests, disabling the arrow shim so we can use a real apache arrow runtime for testing"); | ||
| } catch (ClassNotFoundException notfound) { | ||
| LogManager.getLogger(Shim.class).debug("shimming arrow's allocation manager"); | ||
| AccessController.doPrivileged((PrivilegedAction<Void>) () -> { | ||
| try { | ||
| Field field = DefaultAllocationManagerOption.class.getDeclaredField("DEFAULT_ALLOCATION_MANAGER_FACTORY"); | ||
| field.setAccessible(true); | ||
| field.set(null, new Shim()); | ||
| } catch (Exception e) { | ||
| throw new AssertionError("can't init arrow", e); | ||
| } | ||
| return null; | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public AllocationManager create(BufferAllocator accountingAllocator, long size) { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public ArrowBuf empty() { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.esql.arrow; | ||
|
|
||
| import org.elasticsearch.xcontent.MediaType; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| public class ArrowFormat implements MediaType { | ||
| public static final ArrowFormat INSTANCE = new ArrowFormat(); | ||
|
|
||
| private static final String FORMAT = "arrow"; | ||
| public static final String CONTENT_TYPE = "application/arrow"; | ||
| private static final String VENDOR_CONTENT_TYPE = "application/vnd.elasticsearch+arrow"; | ||
|
|
||
| @Override | ||
| public String queryParameter() { | ||
| return FORMAT; | ||
| } | ||
|
|
||
| @Override | ||
| public Set<HeaderValue> headerValues() { | ||
| return Set.of( | ||
| new HeaderValue(CONTENT_TYPE, Map.of("header", "present|absent")), | ||
| new HeaderValue(VENDOR_CONTENT_TYPE, Map.of("header", "present|absent", COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN)) | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't commit this. But I spent half a day messing with the security manager and gave up. We should be able to fix this.