From d8ab429fe37c97e74a0df14d9bde73fbe272edc6 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sat, 27 Jun 2020 16:55:43 +0300 Subject: [PATCH 1/7] Encapsulate timestamp and timebreaker into Ordinal --- .../eql/execution/assembler/Criterion.java | 60 +++++++-------- .../execution/assembler/KeyAndOrdinal.java | 16 ++-- .../execution/assembler/SequenceRuntime.java | 29 ++++--- .../xpack/eql/execution/sequence/Match.java | 23 ++---- .../xpack/eql/execution/sequence/Ordinal.java | 75 +++++++++++++++++++ .../eql/execution/sequence/Sequence.java | 30 ++------ .../eql/execution/sequence/SequenceFrame.java | 62 +++++---------- .../sequence/SequenceStateMachine.java | 42 +---------- .../xpack/eql/parser/LogicalPlanTests.java | 3 +- 9 files changed, 165 insertions(+), 175 deletions(-) create mode 100644 x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index 18294541dac56..d2a5f2e21ac27 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -10,6 +10,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; import java.util.List; @@ -22,8 +23,8 @@ public class Criterion implements QueryRequest { private final HitExtractor tiebreakerExtractor; // search after markers - private Object[] startMarker; - private Object[] stopMarker; + private Ordinal startMarker; + private Ordinal stopMarker; //TODO: should accept QueryRequest instead of another SearchSourceBuilder public Criterion(SearchSourceBuilder searchSource, List searchAfterExractors, HitExtractor timestampExtractor, @@ -54,54 +55,45 @@ public HitExtractor tiebreakerExtractor() { return tiebreakerExtractor; } - public long timestamp(SearchHit hit) { + @SuppressWarnings({ "unchecked" }) + public Ordinal ordinal(SearchHit hit) { + Object ts = timestampExtractor.extract(hit); - if (ts instanceof Number) { - return ((Number) ts).longValue(); + if (ts instanceof Number == false) { + throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts); } - throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts); - } - @SuppressWarnings({ "unchecked" }) - public Comparable tiebreaker(SearchHit hit) { - if (tiebreakerExtractor == null) { - return null; - } - Object tb = tiebreakerExtractor.extract(hit); - if (tb instanceof Comparable) { - return (Comparable) tb; + long timestamp = ((Number) ts).longValue(); + Comparable tiebreaker = null; + + if (tiebreakerExtractor != null) { + Object tb = tiebreakerExtractor.extract(hit); + if (tb instanceof Comparable == false) { + throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb); + } + tiebreaker = (Comparable) tb; } - throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb); + return new Ordinal(timestamp, tiebreaker); } - public Object[] startMarker() { + public Ordinal startMarker() { return startMarker; } - public Object[] stopMarker() { + public Ordinal stopMarker() { return stopMarker; } - private Object[] marker(SearchHit hit) { - long timestamp = timestamp(hit); - Object tiebreaker = null; - if (tiebreakerExtractor() != null) { - tiebreaker = tiebreaker(hit); - } - - return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; - } - - public void startMarker(SearchHit hit) { - startMarker = marker(hit); + public void startMarker(Ordinal ordinal) { + startMarker = ordinal; } - public void stopMarker(SearchHit hit) { - stopMarker = marker(hit); + public void stopMarker(Ordinal ordinal) { + stopMarker = ordinal; } - public Criterion useMarker(Object[] marker) { - searchSource.searchAfter(marker); + public Criterion useMarker(Ordinal marker) { + searchSource.searchAfter(marker.toArray()); return this; } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java index 9dbdd12ae7125..8b769b90d5c9f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java @@ -6,24 +6,23 @@ package org.elasticsearch.xpack.eql.execution.assembler; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import java.util.Objects; class KeyAndOrdinal { final SequenceKey key; - final long timestamp; - final Comparable tiebreaker; + final Ordinal ordinal; - KeyAndOrdinal(SequenceKey key, long timestamp, Comparable tiebreaker) { + KeyAndOrdinal(SequenceKey key, Ordinal ordinal) { this.key = key; - this.timestamp = timestamp; - this.tiebreaker = tiebreaker; + this.ordinal = ordinal; } @Override public int hashCode() { - return Objects.hash(key, timestamp, tiebreaker); + return Objects.hash(key, ordinal); } @Override @@ -38,12 +37,11 @@ public boolean equals(Object obj) { KeyAndOrdinal other = (KeyAndOrdinal) obj; return Objects.equals(key, other.key) - && Objects.equals(timestamp, other.timestamp) - && Objects.equals(tiebreaker, other.tiebreaker); + && Objects.equals(ordinal, other.ordinal); } @Override public String toString() { - return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]"; + return key.toString() + ordinal.toString(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java index e459f59c6d7e8..b4c9304eeb0aa 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.eql.execution.payload.ReversePayload; import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.QueryClient; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; @@ -73,8 +74,7 @@ private void queryStage(int stage, ActionListener listener) { // narrow by the previous stage timestamp marker Criterion previous = criteria.get(stage - 1); - // if DESC, flip the markers (the stop becomes the start due to the reverse order), otherwise keep it accordingly - Object[] marker = descending && stage == 1 ? previous.stopMarker() : previous.startMarker(); + Ordinal marker = previous.startMarker(); currentCriterion.useMarker(marker); } @@ -98,23 +98,28 @@ private void queryStage(int stage, ActionListener listener) { private void findMatches(int currentStage, List hits) { // update criterion Criterion criterion = criteria.get(currentStage); - criterion.startMarker(hits.get(0)); - criterion.stopMarker(hits.get(hits.size() - 1)); + boolean start = true; // break the results per key // when dealing with descending order, queries outside the base are ASC (search_before) // so look at the data in reverse (that is DESC) + Ordinal last = null; for (Iterator it = descending ? new ReversedIterator<>(hits) : hits.iterator(); it.hasNext();) { SearchHit hit = it.next(); - KeyAndOrdinal ko = key(hit, criterion); + + last = ko.ordinal; + + if (start) { + start = false; + criterion.startMarker(ko.ordinal); + } + if (currentStage == 0) { - Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit); - long tStart = (long) criterion.startMarker()[0]; - long tStop = (long) criterion.stopMarker()[0]; - stateMachine.trackSequence(seq, tStart, tStop); + Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit); + stateMachine.trackSequence(seq); } else { - stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit); + stateMachine.match(currentStage, ko.key, ko.ordinal, hit); // early skip in case of reaching the limit // check the last stage to avoid calling the state machine in other stages @@ -123,6 +128,8 @@ private void findMatches(int currentStage, List hits) { } } } + + criterion.stopMarker(last); } private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { @@ -139,7 +146,7 @@ private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { key = new SequenceKey(docKeys); } - return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tiebreaker(hit)); + return new KeyAndOrdinal(key, criterion.ordinal(hit)); } private Payload sequencePayload() { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java index c81d263ac3c23..40b1e3d7d5d2a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java @@ -15,22 +15,16 @@ */ class Match { - private final long timestamp; - private final Comparable tiebreaker; + private final Ordinal ordinal; private final SearchHit hit; - Match(long timestamp, Comparable tiebreaker, SearchHit hit) { - this.timestamp = timestamp; - this.tiebreaker = tiebreaker; + Match(Ordinal ordinal, SearchHit hit) { + this.ordinal = ordinal; this.hit = hit; } - long timestamp() { - return timestamp; - } - - Comparable tiebreaker() { - return tiebreaker; + Ordinal ordinal() { + return ordinal; } SearchHit hit() { @@ -39,7 +33,7 @@ SearchHit hit() { @Override public int hashCode() { - return Objects.hash(timestamp, tiebreaker, hit); + return Objects.hash(ordinal, hit); } @Override @@ -53,13 +47,12 @@ public boolean equals(Object obj) { } Match other = (Match) obj; - return Objects.equals(timestamp, other.timestamp) - && Objects.equals(tiebreaker, other.tiebreaker) + return Objects.equals(ordinal, other.ordinal) && Objects.equals(hit, other.hit); } @Override public String toString() { - return timestamp + "[" + (tiebreaker != null ? tiebreaker : "") + "]->" + hit.getId(); + return ordinal.toString() + "->" + hit.getId(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java new file mode 100644 index 0000000000000..8a9717e3f779b --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.sequence; + +import java.util.Objects; + +public class Ordinal implements Comparable { + + private final long timestamp; + private final Comparable tiebreaker; + + public Ordinal(long timestamp, Comparable tiebreaker) { + this.timestamp = timestamp; + this.tiebreaker = tiebreaker; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, tiebreaker); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + Ordinal other = (Ordinal) obj; + return Objects.equals(timestamp, other.timestamp) + && Objects.equals(tiebreaker, other.tiebreaker); + } + + @Override + public String toString() { + return "[" + timestamp + "][" + (tiebreaker != null ? tiebreaker.toString() : "") + "]"; + } + + @Override + public int compareTo(Ordinal o) { + if (timestamp < o.timestamp) { + return -1; + } + if (timestamp == o.timestamp) { + if (tiebreaker != null) { + if (o.tiebreaker != null) { + return tiebreaker.compareTo(o.tiebreaker); + } + // nulls are first - lower than any other value + // other tiebreaker is null this one isn't, fall through 1 + } + // null tiebreaker + else { + if (o.tiebreaker != null) { + return -1; + } else { + return 0; + } + } + } + // if none of the branches above matched, this ordinal is greater than o + return 1; + } + + public Object[] toArray() { + return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index d2f452842a662..e41b4435466f6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -31,19 +31,19 @@ public class Sequence { private int currentStage = 0; - public Sequence(SequenceKey key, int stages, long timestamp, Comparable tiebreaker, SearchHit firstHit) { + public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) { Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages); this.key = key; this.stages = stages; this.matches = new Match[stages]; - this.matches[0] = new Match(timestamp, tiebreaker, firstHit); + this.matches[0] = new Match(ordinal, firstHit); } - public int putMatch(int stage, SearchHit hit, long timestamp, Comparable tiebreaker) { + public int putMatch(int stage, SearchHit hit, Ordinal ordinal) { if (stage == currentStage + 1) { int previousStage = currentStage; currentStage = stage; - matches[currentStage] = new Match(timestamp, tiebreaker, hit); + matches[currentStage] = new Match(ordinal, hit); return previousStage; } throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage); @@ -53,24 +53,8 @@ public SequenceKey key() { return key; } - public int currentStage() { - return currentStage; - } - - public long currentTimestamp() { - return matches[currentStage].timestamp(); - } - - public Comparable currentTiebreaker() { - return matches[currentStage].tiebreaker(); - } - - public long timestamp(int stage) { - // stages not initialized yet return an out-of-band value to have no impact on the interval range - if (stage > currentStage) { - return Long.MAX_VALUE; - } - return matches[stage].timestamp(); + public Ordinal ordinal() { + return matches[currentStage].ordinal(); } public List hits() { @@ -110,7 +94,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append(format(null, "[Seq<{}>[{}/{}]]", key, - nf.format(currentStage()), + nf.format(currentStage), nf.format(stages - 1))); for (int i = 0; i < matches.length; i++) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java index cb4ba211eab70..298070a730b7c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java @@ -21,27 +21,21 @@ public class SequenceFrame { // timestamp compression (whose range is known for the current frame). private final List sequences = new LinkedList<>(); - // time frame being/end - private long tBegin = Long.MAX_VALUE, tEnd = Long.MIN_VALUE; - private long min = tBegin, max = tEnd; + private Ordinal start, stop; public void add(Sequence sequence) { sequences.add(sequence); - long ts = sequence.currentTimestamp(); - if (min > ts) { - min = ts; - } - if (max < ts) { - max = ts; - } - } - - public void setTimeFrame(long begin, long end) { - if (tBegin > begin) { - tBegin = begin; - } - if (tEnd < end) { - tEnd = end; + Ordinal ordinal = sequence.ordinal(); + if (start == null) { + start = ordinal; + stop = ordinal; + } else { + if (start.compareTo(ordinal) > 0) { + start = ordinal; + } + if (stop.compareTo(ordinal) < 0) { + stop = ordinal; + } } } @@ -49,24 +43,15 @@ public void setTimeFrame(long begin, long end) { * Returns the latest Sequence from the group that has its timestamp * less than the given argument alongside its position in the list. */ - public Tuple before(long timestamp, Comparable tiebreaker) { + public Tuple before(Ordinal ordinal) { Sequence matchSeq = null; int matchPos = -1; int position = -1; for (Sequence sequence : sequences) { position++; - // ts only comparison - if (sequence.currentTimestamp() < timestamp) { + if (sequence.ordinal().compareTo(ordinal) < 0) { matchSeq = sequence; matchPos = position; - } - // apply tiebreaker (null first, that is null is less than any value) - else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) { - Comparable tb = sequence.currentTiebreaker(); - if (tb == null || tb.compareTo(tiebreaker) < 0) { - matchSeq = sequence; - matchPos = position; - } } else { break; } @@ -78,24 +63,15 @@ else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) { * Returns the first Sequence from the group that has its timestamp * greater than the given argument alongside its position in the list. */ - public Tuple after(long timestamp, Comparable tiebreaker) { + public Tuple after(Ordinal ordinal) { Sequence matchSeq = null; int matchPos = -1; int position = -1; for (Sequence sequence : sequences) { position++; - // ts only comparison - if (sequence.currentTimestamp() > timestamp) { + if (sequence.ordinal().compareTo(ordinal) > 0) { matchSeq = sequence; matchPos = position; - } - // apply tiebreaker (null first, that is null is less than any value) - else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) { - Comparable tb = sequence.currentTiebreaker(); - if (tb == null || tb.compareTo(tiebreaker) > 0) { - matchSeq = sequence; - matchPos = position; - } } else { break; } @@ -112,9 +88,9 @@ public void trim(int position) { // update min time if (sequences.isEmpty() == false) { - min = sequences.get(0).currentTimestamp(); + start = sequences.get(0).ordinal(); } else { - min = Long.MAX_VALUE; + stop = null; } } @@ -124,6 +100,6 @@ public List sequences() { @Override public String toString() { - return format(null, "[{}-{}]({} seqs)", tBegin, tEnd, sequences.size()); + return format(null, "[{}-{}]({} seqs)", start, stop, sequences.size()); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index df4ff63576ac1..1f52b73719e2f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -24,13 +24,6 @@ public class SequenceStateMachine { /** Current keys on each stage */ private final StageToKeys stageToKeys; - /** minimum timestamp per stage */ - /** this ignores the key */ - private final long[] timestampMarkers; - - private final Comparable[] tiebreakerMarkers; - private final boolean hasTieBreaker; - private final int completionStage; /** list of completed sequences - separate to avoid polluting the other stages */ @@ -46,12 +39,8 @@ public SequenceStateMachine(int stages, boolean hasTiebreaker, Limit limit) { this.stageToKeys = new StageToKeys(completionStage); this.keyToSequences = new KeyToSequences(completionStage); - this.timestampMarkers = new long[completionStage]; - this.tiebreakerMarkers = new Comparable[completionStage]; this.completed = new LinkedList<>(); - this.hasTieBreaker = hasTiebreaker; - // limit && offset if (limit != null) { this.offset = limit.offset; @@ -63,34 +52,11 @@ public List completeSequences() { return completed; } - public long getTimestampMarker(int stage) { - return timestampMarkers[stage]; - } - - public Comparable getTiebreakerMarker(int stage) { - return tiebreakerMarkers[stage]; - } - - public void setTimestampMarker(int stage, long timestamp) { - timestampMarkers[stage] = timestamp; - } - - public void setTiebreakerMarker(int stage, Comparable tiebreaker) { - tiebreakerMarkers[stage] = tiebreaker; - } - - public Object[] getMarkers(int stage) { - long ts = timestampMarkers[stage]; - Comparable tb = tiebreakerMarkers[stage]; - return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts }; - } - - public void trackSequence(Sequence sequence, long tStart, long tStop) { + public void trackSequence(Sequence sequence) { SequenceKey key = sequence.key(); stageToKeys.keys(0).add(key); SequenceFrame frame = keyToSequences.frame(0, key); - frame.setTimeFrame(tStart, tStop); frame.add(sequence); } @@ -98,7 +64,7 @@ public void trackSequence(Sequence sequence, long tStart, long tStop) { * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous * given stage. If that's the case, update the sequence and the rest of the references. */ - public boolean match(int stage, SequenceKey key, long timestamp, Comparable tiebreaker, SearchHit hit) { + public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { int previousStage = stage - 1; // check key presence to avoid creating a collection SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key); @@ -106,7 +72,7 @@ public boolean match(int stage, SequenceKey key, long timestamp, Comparable neighbour = frame.before(timestamp, tiebreaker); + Tuple neighbour = frame.before(ordinal); if (neighbour == null) { return false; } @@ -114,7 +80,7 @@ public boolean match(int stage, SequenceKey key, long timestamp, Comparable Date: Sat, 27 Jun 2020 19:27:12 +0300 Subject: [PATCH 2/7] Stop descending from leaking inside the runtime --- .../eql/execution/assembler/Criterion.java | 27 +++++++----- .../execution/assembler/ExecutionManager.java | 7 ++- .../execution/assembler/SequencePayload.java | 11 +++-- .../execution/assembler/SequenceRuntime.java | 43 ++++++++----------- .../execution/payload/AbstractPayload.java | 9 +--- .../eql/execution/payload/ReversePayload.java | 5 --- .../payload/SearchResponsePayload.java | 2 +- .../xpack/eql/session/EmptyPayload.java | 5 --- .../xpack/eql/session/Payload.java | 2 - .../assembler/SequenceRuntimeTests.java | 9 +--- 10 files changed, 51 insertions(+), 69 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index d2a5f2e21ac27..d4a65b623eb8c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; +import org.elasticsearch.xpack.eql.util.ReversedIterator; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; import java.util.List; @@ -25,10 +26,15 @@ public class Criterion implements QueryRequest { // search after markers private Ordinal startMarker; private Ordinal stopMarker; + + private boolean reverse; //TODO: should accept QueryRequest instead of another SearchSourceBuilder - public Criterion(SearchSourceBuilder searchSource, List searchAfterExractors, HitExtractor timestampExtractor, - HitExtractor tiebreakerExtractor) { + public Criterion(SearchSourceBuilder searchSource, + List searchAfterExractors, + HitExtractor timestampExtractor, + HitExtractor tiebreakerExtractor, + boolean reverse) { this.searchSource = searchSource; this.keyExtractors = searchAfterExractors; this.timestampExtractor = timestampExtractor; @@ -36,6 +42,7 @@ public Criterion(SearchSourceBuilder searchSource, List searchAfte this.startMarker = null; this.stopMarker = null; + this.reverse = reverse; } @Override @@ -76,14 +83,6 @@ public Ordinal ordinal(SearchHit hit) { return new Ordinal(timestamp, tiebreaker); } - public Ordinal startMarker() { - return startMarker; - } - - public Ordinal stopMarker() { - return stopMarker; - } - public void startMarker(Ordinal ordinal) { startMarker = ordinal; } @@ -92,8 +91,16 @@ public void stopMarker(Ordinal ordinal) { stopMarker = ordinal; } + public Ordinal nextMarker() { + return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker; + } + public Criterion useMarker(Ordinal marker) { searchSource.searchAfter(marker.toArray()); return this; } + + public Iterable iterateable(List hits) { + return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator(); + } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 9a95ead3e55d7..4d139b7b9bc65 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -48,6 +48,8 @@ public Executable assemble(List> listOfKeys, List criteria = new ArrayList<>(plans.size() - 1); + boolean descending = direction == OrderDirection.DESC; + // build a criterion for each query for (int i = 0; i < plans.size() - 1; i++) { List keys = listOfKeys.get(i); @@ -61,9 +63,10 @@ public Executable assemble(List> listOfKeys, // TODO: this could be generalized into an exec only query Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass()); QueryRequest request = ((EsQueryExec) query).queryRequest(session); - criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor)); + // base query remains descending, the rest need to flip + criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending)); } - return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit); + return new SequenceRuntime(criteria, new BasicQueryClient(session), limit); } private HitExtractor timestampExtractor(HitExtractor hitExtractor) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java index e14860a280fff..362f912b61386 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java @@ -10,18 +10,23 @@ import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload; import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.session.Results.Type; +import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; class SequencePayload extends AbstractPayload { private final List sequences; - SequencePayload(List seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) { - super(timedOut, timeTook, nextKeys); + SequencePayload(List seq, boolean timedOut, TimeValue timeTook) { + super(timedOut, timeTook); sequences = new ArrayList<>(seq.size()); - for (Sequence s : seq) { + boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0); + + for (Iterator it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) { + Sequence s = it.next(); sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits())); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java index b4c9304eeb0aa..c64a84e8312ec 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.eql.execution.payload.ReversePayload; import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.QueryClient; import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; @@ -19,10 +18,8 @@ import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.util.ReversedIterator; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; -import java.util.Iterator; import java.util.List; import static org.elasticsearch.action.ActionListener.wrap; @@ -39,18 +36,15 @@ class SequenceRuntime implements Executable { private final int numberOfStages; private final SequenceStateMachine stateMachine; private final QueryClient queryClient; - private final boolean descending; private long startTime; - SequenceRuntime(List criteria, QueryClient queryClient, boolean descending, Limit limit) { + SequenceRuntime(List criteria, QueryClient queryClient, Limit limit) { this.criteria = criteria; this.numberOfStages = criteria.size(); this.queryClient = queryClient; boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null; this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker, limit); - - this.descending = descending; } @Override @@ -74,8 +68,8 @@ private void queryStage(int stage, ActionListener listener) { // narrow by the previous stage timestamp marker Criterion previous = criteria.get(stage - 1); - Ordinal marker = previous.startMarker(); - currentCriterion.useMarker(marker); + // pass the next marker along + currentCriterion.useMarker(previous.nextMarker()); } log.info("Querying stage {}", stage); @@ -95,41 +89,39 @@ private void queryStage(int stage, ActionListener listener) { } // hits are guaranteed to be non-empty - private void findMatches(int currentStage, List hits) { + private void findMatches(int stage, List hits) { // update criterion - Criterion criterion = criteria.get(currentStage); + Criterion criterion = criteria.get(stage); - boolean start = true; // break the results per key // when dealing with descending order, queries outside the base are ASC (search_before) // so look at the data in reverse (that is DESC) - Ordinal last = null; - for (Iterator it = descending ? new ReversedIterator<>(hits) : hits.iterator(); it.hasNext();) { - SearchHit hit = it.next(); + Ordinal firstOrdinal = null, ordinal = null; + for (SearchHit hit : criterion.iterateable(hits)) { KeyAndOrdinal ko = key(hit, criterion); - last = ko.ordinal; + ordinal = ko.ordinal; - if (start) { - start = false; - criterion.startMarker(ko.ordinal); + if (firstOrdinal == null) { + firstOrdinal = ordinal; } - if (currentStage == 0) { - Sequence seq = new Sequence(ko.key, numberOfStages, ko.ordinal, hit); + if (stage == 0) { + Sequence seq = new Sequence(ko.key, numberOfStages, ordinal, hit); stateMachine.trackSequence(seq); } else { - stateMachine.match(currentStage, ko.key, ko.ordinal, hit); + stateMachine.match(stage, ko.key, ordinal, hit); // early skip in case of reaching the limit // check the last stage to avoid calling the state machine in other stages if (stateMachine.reachedLimit()) { - return; + break; } } } - criterion.stopMarker(last); + criterion.startMarker(firstOrdinal); + criterion.stopMarker(ordinal); } private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { @@ -152,8 +144,7 @@ private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { private Payload sequencePayload() { List completed = stateMachine.completeSequences(); TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); - SequencePayload payload = new SequencePayload(completed, false, tookTime, null); - return descending ? new ReversePayload(payload) : payload; + return new SequencePayload(completed, false, tookTime); } private boolean hasFinished(int stage) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java index 9cde6a102a6b2..6631c6093312c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java @@ -13,12 +13,10 @@ public abstract class AbstractPayload implements Payload { private final boolean timedOut; private final TimeValue timeTook; - private final Object[] nextKeys; - protected AbstractPayload(boolean timedOut, TimeValue timeTook, Object[] nextKeys) { + protected AbstractPayload(boolean timedOut, TimeValue timeTook) { this.timedOut = timedOut; this.timeTook = timeTook; - this.nextKeys = nextKeys; } @Override @@ -30,9 +28,4 @@ public boolean timedOut() { public TimeValue timeTook() { return timeTook; } - - @Override - public Object[] nextKeys() { - return nextKeys; - } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java index 2f7a18f44edb7..8b40d1783513a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java @@ -37,11 +37,6 @@ public TimeValue timeTook() { return delegate.timeTook(); } - @Override - public Object[] nextKeys() { - return delegate.nextKeys(); - } - @Override public List values() { return delegate.values(); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java index 284285de332d1..6d70c087e35ab 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java @@ -18,7 +18,7 @@ public class SearchResponsePayload extends AbstractPayload { private final List hits; public SearchResponsePayload(SearchResponse response) { - super(response.isTimedOut(), response.getTook(), null); + super(response.isTimedOut(), response.getTook()); hits = Arrays.asList(response.getHits().getHits()); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java index 3f4238accc759..22e8405e5f9ef 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java @@ -36,11 +36,6 @@ public TimeValue timeTook() { return TimeValue.ZERO; } - @Override - public Object[] nextKeys() { - return null; - } - @Override public List values() { return emptyList(); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java index 2791e582974e3..920b11465259a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java @@ -22,7 +22,5 @@ public interface Payload { TimeValue timeTook(); - Object[] nextKeys(); - List values(); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java index 99fa6c8bfdcd7..d1691204ff166 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java @@ -80,7 +80,7 @@ class TestCriterion extends Criterion { private final int ordinal; TestCriterion(int ordinal) { - super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor); + super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor, false); this.ordinal = ordinal; } @@ -144,11 +144,6 @@ public TimeValue timeTook() { return TimeValue.ZERO; } - @Override - public Object[] nextKeys() { - return new Object[0]; - } - @SuppressWarnings("unchecked") @Override public List values() { @@ -191,7 +186,7 @@ public void test() { SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> { Map> evs = events.get(r.searchSource().size()); l.onResponse(new TestPayload(evs)); - }, false, null); + }, null); // finally make the assertion at the end of the listener runtime.execute(wrap(this::checkResults, ex -> { From f1c310a2f1e9f02302225336c67455ebe5ab63eb Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sat, 27 Jun 2020 20:06:56 +0300 Subject: [PATCH 3/7] Add maxspan check --- .../execution/assembler/ExecutionManager.java | 4 +++- .../execution/assembler/SequenceRuntime.java | 5 ++-- .../xpack/eql/execution/sequence/Ordinal.java | 4 ++-- .../eql/execution/sequence/Sequence.java | 4 ++++ .../sequence/SequenceStateMachine.java | 24 ++++++++++++------- .../xpack/eql/plan/physical/SequenceExec.java | 20 ++++++++++------ .../xpack/eql/planner/Mapper.java | 2 +- 7 files changed, 41 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 4d139b7b9bc65..795a96b7b619e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.eql.execution.assembler; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -43,6 +44,7 @@ public Executable assemble(List> listOfKeys, Attribute timestamp, Attribute tiebreaker, OrderDirection direction, + TimeValue maxSpan, Limit limit) { FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry(); @@ -66,7 +68,7 @@ public Executable assemble(List> listOfKeys, // base query remains descending, the rest need to flip criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending)); } - return new SequenceRuntime(criteria, new BasicQueryClient(session), limit); + return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit); } private HitExtractor timestampExtractor(HitExtractor hitExtractor) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java index c64a84e8312ec..e7a41c458195a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java @@ -39,12 +39,11 @@ class SequenceRuntime implements Executable { private long startTime; - SequenceRuntime(List criteria, QueryClient queryClient, Limit limit) { + SequenceRuntime(List criteria, QueryClient queryClient, TimeValue maxSpan, Limit limit) { this.criteria = criteria; this.numberOfStages = criteria.size(); this.queryClient = queryClient; - boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null; - this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker, limit); + this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java index 8a9717e3f779b..70f71e002394a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java @@ -10,8 +10,8 @@ public class Ordinal implements Comparable { - private final long timestamp; - private final Comparable tiebreaker; + final long timestamp; + final Comparable tiebreaker; public Ordinal(long timestamp, Comparable tiebreaker) { this.timestamp = timestamp; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index e41b4435466f6..80f6dba71fc87 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -57,6 +57,10 @@ public Ordinal ordinal() { return matches[currentStage].ordinal(); } + public long startTimestamp() { + return matches[0].ordinal().timestamp; + } + public List hits() { List hits = new ArrayList<>(matches.length); for (Match m : matches) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index 1f52b73719e2f..df97f24f4f5fe 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -28,19 +29,22 @@ public class SequenceStateMachine { /** list of completed sequences - separate to avoid polluting the other stages */ private final List completed; - + private final long maxSpanInMillis; + private int offset = 0; private int limit = -1; private boolean limitReached = false; @SuppressWarnings("rawtypes") - public SequenceStateMachine(int stages, boolean hasTiebreaker, Limit limit) { + public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) { this.completionStage = stages - 1; this.stageToKeys = new StageToKeys(completionStage); this.keyToSequences = new KeyToSequences(completionStage); this.completed = new LinkedList<>(); + this.maxSpanInMillis = maxSpan.millis(); + // limit && offset if (limit != null) { this.offset = limit.offset; @@ -71,15 +75,19 @@ public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) if (frame == null || frame.isEmpty()) { return false; } - // pick the sequence with the highest (for ASC) / lowest (for DESC) timestamp lower than current match timestamp - Tuple neighbour = frame.before(ordinal); - if (neighbour == null) { + Tuple before = frame.before(ordinal); + if (before == null) { return false; } - Sequence sequence = neighbour.v1(); + Sequence sequence = before.v1(); // eliminate the match and all previous values from the frame - frame.trim(neighbour.v2() + 1); - // update sequence + frame.trim(before.v2() + 1); + + // check maxspan before continuing the sequence + if (maxSpanInMillis > 0 && (ordinal.timestamp - sequence.startTimestamp() >= maxSpanInMillis)) { + return false; + } + sequence.putMatch(stage, hit, ordinal); // remove the frame and keys early (as the key space is large) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java index 8c3dc31193fd4..8130e36312df4 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plan.physical; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.assembler.ExecutionManager; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -33,6 +34,7 @@ public class SequenceExec extends PhysicalPlan { private final Attribute tiebreaker; private final Limit limit; private final OrderDirection direction; + private final TimeValue maxSpan; public SequenceExec(Source source, List> keys, @@ -41,8 +43,9 @@ public SequenceExec(Source source, PhysicalPlan until, Attribute timestamp, Attribute tiebreaker, - OrderDirection direction) { - this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction); + OrderDirection direction, + TimeValue maxSpan) { + this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction, maxSpan); } private SequenceExec(Source source, @@ -51,18 +54,20 @@ private SequenceExec(Source source, Attribute ts, Attribute tb, Limit limit, - OrderDirection direction) { + OrderDirection direction, + TimeValue maxSpan) { super(source, children); this.keys = keys; this.timestamp = ts; this.tiebreaker = tb; this.limit = limit; this.direction = direction; + this.maxSpan = maxSpan; } @Override protected NodeInfo info() { - return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction); + return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction, maxSpan); } @Override @@ -72,7 +77,7 @@ public PhysicalPlan replaceChildren(List newChildren) { children().size(), newChildren.size()); } - return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction); + return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction, maxSpan); } @Override @@ -109,12 +114,13 @@ public OrderDirection direction() { } public SequenceExec with(Limit limit) { - return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction); + return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction, maxSpan); } @Override public void execute(EqlSession session, ActionListener listener) { - new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, limit()).execute(listener); + new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, maxSpan, limit()).execute( + listener); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java index 1b236bebe30ab..5cfae03bd0d53 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java @@ -78,7 +78,7 @@ protected PhysicalPlan map(LogicalPlan p) { map(s.until().child()), s.timestamp(), s.tiebreaker(), - s.direction()); + s.direction(), s.maxSpan()); } if (p instanceof LocalRelation) { From a98bcba2ec62767465577b937980f68e1978265c Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sat, 27 Jun 2020 23:05:13 +0300 Subject: [PATCH 4/7] Enable integration tests with maxspan --- .../src/main/resources/test_queries.toml | 2 +- .../resources/test_queries_unsupported.toml | 23 ------------------- .../assembler/SequenceRuntimeTests.java | 2 +- 3 files changed, 2 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries.toml index f69f8c8953e3c..5777351898445 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries.toml @@ -440,7 +440,7 @@ expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] [[queries]] query = ''' -sequence with maxspan=0.5s +sequence with maxspan=500ms [file where event_subtype_full == "file_create_event"] by file_path [process where opcode == 1] by process_path [process where opcode == 2] by process_path diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml index 6f53a93d9e6b4..629c2ba4738db 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml @@ -224,18 +224,6 @@ sequence ''' expected_event_ids = [1, 2, 2, 3] - -[[queries]] -query = ''' -sequence with maxspan=1d - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2''' -expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] - [[queries]] query = ''' sequence with maxspan=1h @@ -269,17 +257,6 @@ sequence with maxspan=10s | tail 2''' expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] -[[queries]] -query = ''' -sequence with maxspan=0.5s - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2''' -expected_event_ids = [] - [[queries]] query = ''' sequence diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java index d1691204ff166..b48b0f0853294 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java @@ -186,7 +186,7 @@ public void test() { SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> { Map> evs = events.get(r.searchSource().size()); l.onResponse(new TestPayload(evs)); - }, null); + }, TimeValue.MINUS_ONE, null); // finally make the assertion at the end of the listener runtime.execute(wrap(this::checkResults, ex -> { From 5b2a97e07697c147335efaa1b5481401674ddf29 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sun, 28 Jun 2020 00:17:45 +0300 Subject: [PATCH 5/7] Convert windows filetime to unix time --- .../test/eql/CommonEqlActionTestCase.java | 11 +++++++ .../src/main/resources/mapping-default.json | 3 +- .../resources/test_queries_unsupported.toml | 33 ------------------- 3 files changed, 12 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java index e80ff341af7fa..310afc41ffb10 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java @@ -79,6 +79,9 @@ private static void setupData(CommonEqlActionTestCase tc) throws Exception { for (Object item : list) { assertThat(item, instanceOf(HashMap.class)); Map entry = (Map) item; + Long ts = (Long) entry.get("timestamp"); + // currently this is windows filetime + entry.put("@timestamp", winFileTimeToUnix(ts)); bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON)); } } @@ -89,6 +92,14 @@ private static void setupData(CommonEqlActionTestCase tc) throws Exception { assertFalse(bulkResponse.hasFailures()); isSetUp = true; } + + } + private static final long FILETIME_EPOCH_DIFF = 11644473600000L; + private static final long FILETIME_ONE_MILLISECOND = 10 * 1000; + + public static long winFileTimeToUnix(final long filetime) { + long ts = (filetime / FILETIME_ONE_MILLISECOND); + return ts - FILETIME_EPOCH_DIFF; } private static void cleanupData(CommonEqlActionTestCase tc) throws Exception { diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json b/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json index b9deea3e1086e..b8e0e4840e346 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json +++ b/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json @@ -46,8 +46,7 @@ "type" : "date" }, "@timestamp" : { - "type" : "alias", - "path" : "timestamp" + "type" : "date" }, "user" : { "type" : "keyword" diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml index 629c2ba4738db..224c9200e1cc3 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml @@ -224,39 +224,6 @@ sequence ''' expected_event_ids = [1, 2, 2, 3] -[[queries]] -query = ''' -sequence with maxspan=1h - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2''' -expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] - -[[queries]] -query = ''' -sequence with maxspan=1m - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2''' -expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] - -[[queries]] -query = ''' -sequence with maxspan=10s - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2''' -expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75] - [[queries]] query = ''' sequence From b4c19d528bffc0a3b7d327e0866560de220dea77 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Mon, 29 Jun 2020 18:34:10 +0300 Subject: [PATCH 6/7] Address feedback --- .../eql/execution/assembler/Criterion.java | 2 +- .../execution/assembler/SequenceRuntime.java | 2 +- .../eql/execution/sequence/SequenceFrame.java | 21 +++++++------------ 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index d4a65b623eb8c..3f91f6c766137 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -100,7 +100,7 @@ public Criterion useMarker(Ordinal marker) { return this; } - public Iterable iterateable(List hits) { + public Iterable iterable(List hits) { return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator(); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java index e7a41c458195a..09284e2bfd5b8 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java @@ -96,7 +96,7 @@ private void findMatches(int stage, List hits) { // when dealing with descending order, queries outside the base are ASC (search_before) // so look at the data in reverse (that is DESC) Ordinal firstOrdinal = null, ordinal = null; - for (SearchHit hit : criterion.iterateable(hits)) { + for (SearchHit hit : criterion.iterable(hits)) { KeyAndOrdinal ko = key(hit, criterion); ordinal = ko.ordinal; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java index 298070a730b7c..90e63189d8fb3 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java @@ -10,6 +10,7 @@ import java.util.LinkedList; import java.util.List; +import java.util.function.Predicate; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; @@ -44,19 +45,7 @@ public void add(Sequence sequence) { * less than the given argument alongside its position in the list. */ public Tuple before(Ordinal ordinal) { - Sequence matchSeq = null; - int matchPos = -1; - int position = -1; - for (Sequence sequence : sequences) { - position++; - if (sequence.ordinal().compareTo(ordinal) < 0) { - matchSeq = sequence; - matchPos = position; - } else { - break; - } - } - return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null; + return find(o -> o.compareTo(ordinal) < 0); } /** @@ -64,12 +53,16 @@ public Tuple before(Ordinal ordinal) { * greater than the given argument alongside its position in the list. */ public Tuple after(Ordinal ordinal) { + return find(o -> o.compareTo(ordinal) > 0); + } + + private Tuple find(Predicate predicate) { Sequence matchSeq = null; int matchPos = -1; int position = -1; for (Sequence sequence : sequences) { position++; - if (sequence.ordinal().compareTo(ordinal) > 0) { + if (predicate.test(sequence.ordinal())) { matchSeq = sequence; matchPos = position; } else { From 9d5ae3318cda8d8a2157303e97bc98c3c366c624 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Mon, 29 Jun 2020 19:38:54 +0300 Subject: [PATCH 7/7] Disable unsupported tests --- .../resources/test_queries_unsupported.toml | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml index f4aff6a46c5f1..d34d4ff4561e2 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml @@ -981,6 +981,28 @@ query = ''' registry where arrayContains(bytes_written_string_list, "missing", "en-US") ''' + +[[queries]] +note = "Sequence: non-field based join." +query = ''' +sequence + [process where serial_event_id<3] by unique_pid * 2 + [process where true] by unique_ppid * 2 +''' +expected_event_ids = [1, 2, + 2, 3] + + +[[queries]] +note = "Sequence: multiple non-field based joins." +query = ''' +sequence + [process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid) + [process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid) +''' +expected_event_ids = [1, 2, + 2, 3] + # TODO: update toggles for this function [[queries]] case_sensitive = true