ESQL: Fix AsyncOperator status values and add emitted rows#132738
ESQL: Fix AsyncOperator status values and add emitted rows#132738ivancea merged 25 commits intoelastic:mainfrom
Conversation
| assertThat(map, hasEntry(is("received_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("completed_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("process_nanos"), nonNegativeMatcher)); |
There was a problem hiding this comment.
The async operator status fields are serialized as "received_pages" instead of "pages_received" (Like the others operators).
Should I normalize it here? It's not even a transport version change, just some unit tests
There was a problem hiding this comment.
++ to make this consistent with the other operators.
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @ivancea, I've created a changelog YAML for you. |
| assertThat(map, hasEntry(is("received_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("completed_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("process_nanos"), nonNegativeMatcher)); |
There was a problem hiding this comment.
++ to make this consistent with the other operators.
| @Override | ||
| public final Operator.Status status() { | ||
| return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum()); | ||
| return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum()); |
) Fixes #128030 Fixes #130296 Fixes #130642 Fixes #131148 Fixes #132554 Fixes #132555 Fixes #132604 Fixes #131563 Fixes #132778 Extracted from #132738 An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
# Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java
…tic#132744) Fixes elastic#128030 Fixes elastic#130296 Fixes elastic#130642 Fixes elastic#131148 Fixes elastic#132554 Fixes elastic#132555 Fixes elastic#132604 Fixes elastic#131563 Fixes elastic#132778 Extracted from elastic#132738 An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
# Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java
| /** | ||
| * EqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberEqualTo(Number value) { | ||
| return wholeMatcher(comparesEqualTo(value.intValue()), comparesEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * GreaterThanOrEqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberGreaterThanOrEqualTo(Number value) { | ||
| return wholeMatcher(greaterThanOrEqualTo(value.intValue()), greaterThanOrEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * Matcher that matches based on the number type (Integer or long). | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| protected final Matcher<Object> wholeMatcher(Matcher<Integer> integerMatcher, Matcher<Long> longMatcher) { | ||
| return either(both(instanceOf(Integer.class)).and((Matcher<? super Object>) (Matcher<?>) integerMatcher)).or( | ||
| both(instanceOf(Long.class)).and((Matcher<? super Object>) (Matcher<?>) longMatcher) | ||
| ); | ||
| } |
There was a problem hiding this comment.
Without those, we can't safely make a "greater than 0" matcher, for example, as it could be either an int or a long. This takes care of that. I couldn't find a better option.
There was a problem hiding this comment.
either(greaterThan(0)).or(greaterThan(0L))?
I found that they always made 0 the Integer when deserializing unless the result is huge.
There was a problem hiding this comment.
Changed! It required some not-technically-correct casts, but worked fine
| /** | ||
| * EqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberEqualTo(Number value) { | ||
| return wholeMatcher(comparesEqualTo(value.intValue()), comparesEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * GreaterThanOrEqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberGreaterThanOrEqualTo(Number value) { | ||
| return wholeMatcher(greaterThanOrEqualTo(value.intValue()), greaterThanOrEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * Matcher that matches based on the number type (Integer or long). | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| protected final Matcher<Object> wholeMatcher(Matcher<Integer> integerMatcher, Matcher<Long> longMatcher) { | ||
| return either(both(instanceOf(Integer.class)).and((Matcher<? super Object>) (Matcher<?>) integerMatcher)).or( | ||
| both(instanceOf(Long.class)).and((Matcher<? super Object>) (Matcher<?>) longMatcher) | ||
| ); | ||
| } |
There was a problem hiding this comment.
either(greaterThan(0)).or(greaterThan(0L))?
I found that they always made 0 the Integer when deserializing unless the result is huge.
# Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java
Some fixes around
AsyncOperator.Status:emitted_rowsfield toLookupFromIndexOperator.Status, which was missing, and would add interesting data on join results