diff --git a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/DataLoader.java b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/DataLoader.java index ab111c2243d9f..bab624aca724b 100644 --- a/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/DataLoader.java +++ b/x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/DataLoader.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + public class DataLoader { private static final String TEST_DATA = "/test_data.json"; @@ -63,7 +66,10 @@ protected static void loadDatasetIntoEs(RestHighLevelClient client, try (XContentParser parser = p.apply(JsonXContent.jsonXContent, DataLoader.class.getResourceAsStream(TEST_DATA))) { List list = parser.list(); for (Object item : list) { - bulk.add(new IndexRequest(testIndexName).source((Map) item, XContentType.JSON)); + assertThat(item, instanceOf(Map.class)); + Map entry = (Map) item; + transformDataset(entry); + bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON)); } } @@ -77,6 +83,23 @@ protected static void loadDatasetIntoEs(RestHighLevelClient client, } } + private static void transformDataset(Map entry) { + Object object = entry.get("timestamp"); + assertThat(object, instanceOf(Long.class)); + Long ts = (Long) object; + // currently this is windows filetime + entry.put("@timestamp", winFileTimeToUnix(ts)); + } + + + private static final long FILETIME_EPOCH_DIFF = 11644473600000L; + private static final long FILETIME_ONE_MILLISECOND = 10 * 1000; + + public static long winFileTimeToUnix(final long filetime) { + long ts = (filetime / FILETIME_ONE_MILLISECOND); + return ts - FILETIME_EPOCH_DIFF; + } + private static XContentParser createParser(XContent xContent, InputStream data) throws IOException { NamedXContentRegistry contentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables()); return xContent.createParser(contentRegistry, LoggingDeprecationHandler.INSTANCE, data); diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json b/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json index b9deea3e1086e..b8e0e4840e346 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json +++ b/x-pack/plugin/eql/qa/common/src/main/resources/mapping-default.json @@ -46,8 +46,7 @@ "type" : "date" }, "@timestamp" : { - "type" : "alias", - "path" : "timestamp" + "type" : "date" }, "user" : { "type" : "keyword" diff --git a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml index f7514cd9a30a8..d34d4ff4561e2 100644 --- a/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml +++ b/x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml @@ -336,51 +336,6 @@ until ''' expected_event_ids = [54, 55, 61, 67] -[[queries]] -query = ''' -sequence - [process where opcode == 1] by unique_pid, process_path - [file where opcode == 0] by unique_pid, process_path - [file where opcode == 0] by unique_pid, process_path - [file where opcode == 0] by unique_pid, process_path -until - [file where opcode == 200] by unique_pid, process_path -''' - - -[[queries]] -note = "Sequence: non-field based join." -query = ''' -sequence - [process where serial_event_id<3] by unique_pid * 2 - [process where true] by unique_ppid * 2 -''' -expected_event_ids = [1, 2, - 2, 3] - - -[[queries]] -note = "Sequence: multiple non-field based joins." -query = ''' -sequence - [process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid) - [process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid) -''' -expected_event_ids = [1, 2, - 2, 3] - -[[queries]] -query = ''' -sequence with maxspan=500ms - [file where event_subtype_full == "file_create_event"] by file_path - [process where opcode == 1] by process_path - [process where opcode == 2] by process_path - [file where event_subtype_full == "file_delete_event"] by file_path -| head 4 -| tail 2 -''' -expected_event_ids = [] - [[queries]] query = ''' sequence @@ -1026,6 +981,28 @@ query = ''' registry where arrayContains(bytes_written_string_list, "missing", "en-US") ''' + +[[queries]] +note = "Sequence: non-field based join." +query = ''' +sequence + [process where serial_event_id<3] by unique_pid * 2 + [process where true] by unique_ppid * 2 +''' +expected_event_ids = [1, 2, + 2, 3] + + +[[queries]] +note = "Sequence: multiple non-field based joins." +query = ''' +sequence + [process where serial_event_id<3] by unique_pid * 2, length(unique_pid), string(unique_pid) + [process where true] by unique_ppid * 2, length(unique_ppid), string(unique_ppid) +''' +expected_event_ids = [1, 2, + 2, 3] + # TODO: update toggles for this function [[queries]] case_sensitive = true diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java index 18294541dac56..3f91f6c766137 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java @@ -10,6 +10,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.QueryRequest; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; +import org.elasticsearch.xpack.eql.util.ReversedIterator; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; import java.util.List; @@ -22,12 +24,17 @@ public class Criterion implements QueryRequest { private final HitExtractor tiebreakerExtractor; // search after markers - private Object[] startMarker; - private Object[] stopMarker; + private Ordinal startMarker; + private Ordinal stopMarker; + + private boolean reverse; //TODO: should accept QueryRequest instead of another SearchSourceBuilder - public Criterion(SearchSourceBuilder searchSource, List searchAfterExractors, HitExtractor timestampExtractor, - HitExtractor tiebreakerExtractor) { + public Criterion(SearchSourceBuilder searchSource, + List searchAfterExractors, + HitExtractor timestampExtractor, + HitExtractor tiebreakerExtractor, + boolean reverse) { this.searchSource = searchSource; this.keyExtractors = searchAfterExractors; this.timestampExtractor = timestampExtractor; @@ -35,6 +42,7 @@ public Criterion(SearchSourceBuilder searchSource, List searchAfte this.startMarker = null; this.stopMarker = null; + this.reverse = reverse; } @Override @@ -54,54 +62,45 @@ public HitExtractor tiebreakerExtractor() { return tiebreakerExtractor; } - public long timestamp(SearchHit hit) { + @SuppressWarnings({ "unchecked" }) + public Ordinal ordinal(SearchHit hit) { + Object ts = timestampExtractor.extract(hit); - if (ts instanceof Number) { - return ((Number) ts).longValue(); + if (ts instanceof Number == false) { + throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts); } - throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts); - } - @SuppressWarnings({ "unchecked" }) - public Comparable tiebreaker(SearchHit hit) { - if (tiebreakerExtractor == null) { - return null; - } - Object tb = tiebreakerExtractor.extract(hit); - if (tb instanceof Comparable) { - return (Comparable) tb; - } - throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb); - } + long timestamp = ((Number) ts).longValue(); + Comparable tiebreaker = null; - public Object[] startMarker() { - return startMarker; + if (tiebreakerExtractor != null) { + Object tb = tiebreakerExtractor.extract(hit); + if (tb instanceof Comparable == false) { + throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb); + } + tiebreaker = (Comparable) tb; + } + return new Ordinal(timestamp, tiebreaker); } - public Object[] stopMarker() { - return stopMarker; + public void startMarker(Ordinal ordinal) { + startMarker = ordinal; } - private Object[] marker(SearchHit hit) { - long timestamp = timestamp(hit); - Object tiebreaker = null; - if (tiebreakerExtractor() != null) { - tiebreaker = tiebreaker(hit); - } - - return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; + public void stopMarker(Ordinal ordinal) { + stopMarker = ordinal; } - public void startMarker(SearchHit hit) { - startMarker = marker(hit); + public Ordinal nextMarker() { + return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker; } - public void stopMarker(SearchHit hit) { - stopMarker = marker(hit); + public Criterion useMarker(Ordinal marker) { + searchSource.searchAfter(marker.toArray()); + return this; } - public Criterion useMarker(Object[] marker) { - searchSource.searchAfter(marker); - return this; + public Iterable iterable(List hits) { + return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator(); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java index 9a95ead3e55d7..795a96b7b619e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.eql.execution.assembler; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -43,11 +44,14 @@ public Executable assemble(List> listOfKeys, Attribute timestamp, Attribute tiebreaker, OrderDirection direction, + TimeValue maxSpan, Limit limit) { FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry(); List criteria = new ArrayList<>(plans.size() - 1); + boolean descending = direction == OrderDirection.DESC; + // build a criterion for each query for (int i = 0; i < plans.size() - 1; i++) { List keys = listOfKeys.get(i); @@ -61,9 +65,10 @@ public Executable assemble(List> listOfKeys, // TODO: this could be generalized into an exec only query Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass()); QueryRequest request = ((EsQueryExec) query).queryRequest(session); - criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor)); + // base query remains descending, the rest need to flip + criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending)); } - return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit); + return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit); } private HitExtractor timestampExtractor(HitExtractor hitExtractor) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java index 9dbdd12ae7125..8b769b90d5c9f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java @@ -6,24 +6,23 @@ package org.elasticsearch.xpack.eql.execution.assembler; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import java.util.Objects; class KeyAndOrdinal { final SequenceKey key; - final long timestamp; - final Comparable tiebreaker; + final Ordinal ordinal; - KeyAndOrdinal(SequenceKey key, long timestamp, Comparable tiebreaker) { + KeyAndOrdinal(SequenceKey key, Ordinal ordinal) { this.key = key; - this.timestamp = timestamp; - this.tiebreaker = tiebreaker; + this.ordinal = ordinal; } @Override public int hashCode() { - return Objects.hash(key, timestamp, tiebreaker); + return Objects.hash(key, ordinal); } @Override @@ -38,12 +37,11 @@ public boolean equals(Object obj) { KeyAndOrdinal other = (KeyAndOrdinal) obj; return Objects.equals(key, other.key) - && Objects.equals(timestamp, other.timestamp) - && Objects.equals(tiebreaker, other.tiebreaker); + && Objects.equals(ordinal, other.ordinal); } @Override public String toString() { - return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]"; + return key.toString() + ordinal.toString(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java index e14860a280fff..362f912b61386 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequencePayload.java @@ -10,18 +10,23 @@ import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload; import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.session.Results.Type; +import org.elasticsearch.xpack.eql.util.ReversedIterator; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; class SequencePayload extends AbstractPayload { private final List sequences; - SequencePayload(List seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) { - super(timedOut, timeTook, nextKeys); + SequencePayload(List seq, boolean timedOut, TimeValue timeTook) { + super(timedOut, timeTook); sequences = new ArrayList<>(seq.size()); - for (Sequence s : seq) { + boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0); + + for (Iterator it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) { + Sequence s = it.next(); sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits())); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java index e459f59c6d7e8..09284e2bfd5b8 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntime.java @@ -11,17 +11,15 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.eql.execution.payload.ReversePayload; import org.elasticsearch.xpack.eql.execution.search.Limit; import org.elasticsearch.xpack.eql.execution.search.QueryClient; +import org.elasticsearch.xpack.eql.execution.sequence.Ordinal; import org.elasticsearch.xpack.eql.execution.sequence.Sequence; import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey; import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine; import org.elasticsearch.xpack.eql.session.Payload; -import org.elasticsearch.xpack.eql.util.ReversedIterator; import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; -import java.util.Iterator; import java.util.List; import static org.elasticsearch.action.ActionListener.wrap; @@ -38,18 +36,14 @@ class SequenceRuntime implements Executable { private final int numberOfStages; private final SequenceStateMachine stateMachine; private final QueryClient queryClient; - private final boolean descending; private long startTime; - SequenceRuntime(List criteria, QueryClient queryClient, boolean descending, Limit limit) { + SequenceRuntime(List criteria, QueryClient queryClient, TimeValue maxSpan, Limit limit) { this.criteria = criteria; this.numberOfStages = criteria.size(); this.queryClient = queryClient; - boolean hasTiebreaker = criteria.get(0).tiebreakerExtractor() != null; - this.stateMachine = new SequenceStateMachine(numberOfStages, hasTiebreaker, limit); - - this.descending = descending; + this.stateMachine = new SequenceStateMachine(numberOfStages, maxSpan, limit); } @Override @@ -73,9 +67,8 @@ private void queryStage(int stage, ActionListener listener) { // narrow by the previous stage timestamp marker Criterion previous = criteria.get(stage - 1); - // if DESC, flip the markers (the stop becomes the start due to the reverse order), otherwise keep it accordingly - Object[] marker = descending && stage == 1 ? previous.stopMarker() : previous.startMarker(); - currentCriterion.useMarker(marker); + // pass the next marker along + currentCriterion.useMarker(previous.nextMarker()); } log.info("Querying stage {}", stage); @@ -95,34 +88,39 @@ private void queryStage(int stage, ActionListener listener) { } // hits are guaranteed to be non-empty - private void findMatches(int currentStage, List hits) { + private void findMatches(int stage, List hits) { // update criterion - Criterion criterion = criteria.get(currentStage); - criterion.startMarker(hits.get(0)); - criterion.stopMarker(hits.get(hits.size() - 1)); + Criterion criterion = criteria.get(stage); // break the results per key // when dealing with descending order, queries outside the base are ASC (search_before) // so look at the data in reverse (that is DESC) - for (Iterator it = descending ? new ReversedIterator<>(hits) : hits.iterator(); it.hasNext();) { - SearchHit hit = it.next(); - + Ordinal firstOrdinal = null, ordinal = null; + for (SearchHit hit : criterion.iterable(hits)) { KeyAndOrdinal ko = key(hit, criterion); - if (currentStage == 0) { - Sequence seq = new Sequence(ko.key, numberOfStages, ko.timestamp, ko.tiebreaker, hit); - long tStart = (long) criterion.startMarker()[0]; - long tStop = (long) criterion.stopMarker()[0]; - stateMachine.trackSequence(seq, tStart, tStop); + + ordinal = ko.ordinal; + + if (firstOrdinal == null) { + firstOrdinal = ordinal; + } + + if (stage == 0) { + Sequence seq = new Sequence(ko.key, numberOfStages, ordinal, hit); + stateMachine.trackSequence(seq); } else { - stateMachine.match(currentStage, ko.key, ko.timestamp, ko.tiebreaker, hit); + stateMachine.match(stage, ko.key, ordinal, hit); // early skip in case of reaching the limit // check the last stage to avoid calling the state machine in other stages if (stateMachine.reachedLimit()) { - return; + break; } } } + + criterion.startMarker(firstOrdinal); + criterion.stopMarker(ordinal); } private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { @@ -139,14 +137,13 @@ private KeyAndOrdinal key(SearchHit hit, Criterion criterion) { key = new SequenceKey(docKeys); } - return new KeyAndOrdinal(key, criterion.timestamp(hit), criterion.tiebreaker(hit)); + return new KeyAndOrdinal(key, criterion.ordinal(hit)); } private Payload sequencePayload() { List completed = stateMachine.completeSequences(); TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime); - SequencePayload payload = new SequencePayload(completed, false, tookTime, null); - return descending ? new ReversePayload(payload) : payload; + return new SequencePayload(completed, false, tookTime); } private boolean hasFinished(int stage) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java index 9cde6a102a6b2..6631c6093312c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/AbstractPayload.java @@ -13,12 +13,10 @@ public abstract class AbstractPayload implements Payload { private final boolean timedOut; private final TimeValue timeTook; - private final Object[] nextKeys; - protected AbstractPayload(boolean timedOut, TimeValue timeTook, Object[] nextKeys) { + protected AbstractPayload(boolean timedOut, TimeValue timeTook) { this.timedOut = timedOut; this.timeTook = timeTook; - this.nextKeys = nextKeys; } @Override @@ -30,9 +28,4 @@ public boolean timedOut() { public TimeValue timeTook() { return timeTook; } - - @Override - public Object[] nextKeys() { - return nextKeys; - } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java index 2f7a18f44edb7..8b40d1783513a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/ReversePayload.java @@ -37,11 +37,6 @@ public TimeValue timeTook() { return delegate.timeTook(); } - @Override - public Object[] nextKeys() { - return delegate.nextKeys(); - } - @Override public List values() { return delegate.values(); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java index 284285de332d1..6d70c087e35ab 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/SearchResponsePayload.java @@ -18,7 +18,7 @@ public class SearchResponsePayload extends AbstractPayload { private final List hits; public SearchResponsePayload(SearchResponse response) { - super(response.isTimedOut(), response.getTook(), null); + super(response.isTimedOut(), response.getTook()); hits = Arrays.asList(response.getHits().getHits()); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java index c81d263ac3c23..40b1e3d7d5d2a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java @@ -15,22 +15,16 @@ */ class Match { - private final long timestamp; - private final Comparable tiebreaker; + private final Ordinal ordinal; private final SearchHit hit; - Match(long timestamp, Comparable tiebreaker, SearchHit hit) { - this.timestamp = timestamp; - this.tiebreaker = tiebreaker; + Match(Ordinal ordinal, SearchHit hit) { + this.ordinal = ordinal; this.hit = hit; } - long timestamp() { - return timestamp; - } - - Comparable tiebreaker() { - return tiebreaker; + Ordinal ordinal() { + return ordinal; } SearchHit hit() { @@ -39,7 +33,7 @@ SearchHit hit() { @Override public int hashCode() { - return Objects.hash(timestamp, tiebreaker, hit); + return Objects.hash(ordinal, hit); } @Override @@ -53,13 +47,12 @@ public boolean equals(Object obj) { } Match other = (Match) obj; - return Objects.equals(timestamp, other.timestamp) - && Objects.equals(tiebreaker, other.tiebreaker) + return Objects.equals(ordinal, other.ordinal) && Objects.equals(hit, other.hit); } @Override public String toString() { - return timestamp + "[" + (tiebreaker != null ? tiebreaker : "") + "]->" + hit.getId(); + return ordinal.toString() + "->" + hit.getId(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java new file mode 100644 index 0000000000000..70f71e002394a --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Ordinal.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution.sequence; + +import java.util.Objects; + +public class Ordinal implements Comparable { + + final long timestamp; + final Comparable tiebreaker; + + public Ordinal(long timestamp, Comparable tiebreaker) { + this.timestamp = timestamp; + this.tiebreaker = tiebreaker; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, tiebreaker); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + Ordinal other = (Ordinal) obj; + return Objects.equals(timestamp, other.timestamp) + && Objects.equals(tiebreaker, other.tiebreaker); + } + + @Override + public String toString() { + return "[" + timestamp + "][" + (tiebreaker != null ? tiebreaker.toString() : "") + "]"; + } + + @Override + public int compareTo(Ordinal o) { + if (timestamp < o.timestamp) { + return -1; + } + if (timestamp == o.timestamp) { + if (tiebreaker != null) { + if (o.tiebreaker != null) { + return tiebreaker.compareTo(o.tiebreaker); + } + // nulls are first - lower than any other value + // other tiebreaker is null this one isn't, fall through 1 + } + // null tiebreaker + else { + if (o.tiebreaker != null) { + return -1; + } else { + return 0; + } + } + } + // if none of the branches above matched, this ordinal is greater than o + return 1; + } + + public Object[] toArray() { + return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp }; + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java index d2f452842a662..80f6dba71fc87 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Sequence.java @@ -31,19 +31,19 @@ public class Sequence { private int currentStage = 0; - public Sequence(SequenceKey key, int stages, long timestamp, Comparable tiebreaker, SearchHit firstHit) { + public Sequence(SequenceKey key, int stages, Ordinal ordinal, SearchHit firstHit) { Check.isTrue(stages >= 2, "A sequence requires at least 2 criteria, given [{}]", stages); this.key = key; this.stages = stages; this.matches = new Match[stages]; - this.matches[0] = new Match(timestamp, tiebreaker, firstHit); + this.matches[0] = new Match(ordinal, firstHit); } - public int putMatch(int stage, SearchHit hit, long timestamp, Comparable tiebreaker) { + public int putMatch(int stage, SearchHit hit, Ordinal ordinal) { if (stage == currentStage + 1) { int previousStage = currentStage; currentStage = stage; - matches[currentStage] = new Match(timestamp, tiebreaker, hit); + matches[currentStage] = new Match(ordinal, hit); return previousStage; } throw new EqlIllegalArgumentException("Incorrect stage [{}] specified for Sequence[key={}, stage=]", stage, key, currentStage); @@ -53,24 +53,12 @@ public SequenceKey key() { return key; } - public int currentStage() { - return currentStage; + public Ordinal ordinal() { + return matches[currentStage].ordinal(); } - public long currentTimestamp() { - return matches[currentStage].timestamp(); - } - - public Comparable currentTiebreaker() { - return matches[currentStage].tiebreaker(); - } - - public long timestamp(int stage) { - // stages not initialized yet return an out-of-band value to have no impact on the interval range - if (stage > currentStage) { - return Long.MAX_VALUE; - } - return matches[stage].timestamp(); + public long startTimestamp() { + return matches[0].ordinal().timestamp; } public List hits() { @@ -110,7 +98,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append(format(null, "[Seq<{}>[{}/{}]]", key, - nf.format(currentStage()), + nf.format(currentStage), nf.format(stages - 1))); for (int i = 0; i < matches.length; i++) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java index cb4ba211eab70..90e63189d8fb3 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceFrame.java @@ -10,6 +10,7 @@ import java.util.LinkedList; import java.util.List; +import java.util.function.Predicate; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; @@ -21,27 +22,21 @@ public class SequenceFrame { // timestamp compression (whose range is known for the current frame). private final List sequences = new LinkedList<>(); - // time frame being/end - private long tBegin = Long.MAX_VALUE, tEnd = Long.MIN_VALUE; - private long min = tBegin, max = tEnd; + private Ordinal start, stop; public void add(Sequence sequence) { sequences.add(sequence); - long ts = sequence.currentTimestamp(); - if (min > ts) { - min = ts; - } - if (max < ts) { - max = ts; - } - } - - public void setTimeFrame(long begin, long end) { - if (tBegin > begin) { - tBegin = begin; - } - if (tEnd < end) { - tEnd = end; + Ordinal ordinal = sequence.ordinal(); + if (start == null) { + start = ordinal; + stop = ordinal; + } else { + if (start.compareTo(ordinal) > 0) { + start = ordinal; + } + if (stop.compareTo(ordinal) < 0) { + stop = ordinal; + } } } @@ -49,53 +44,27 @@ public void setTimeFrame(long begin, long end) { * Returns the latest Sequence from the group that has its timestamp * less than the given argument alongside its position in the list. */ - public Tuple before(long timestamp, Comparable tiebreaker) { - Sequence matchSeq = null; - int matchPos = -1; - int position = -1; - for (Sequence sequence : sequences) { - position++; - // ts only comparison - if (sequence.currentTimestamp() < timestamp) { - matchSeq = sequence; - matchPos = position; - } - // apply tiebreaker (null first, that is null is less than any value) - else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) { - Comparable tb = sequence.currentTiebreaker(); - if (tb == null || tb.compareTo(tiebreaker) < 0) { - matchSeq = sequence; - matchPos = position; - } - } else { - break; - } - } - return matchSeq != null ? new Tuple<>(matchSeq, matchPos) : null; + public Tuple before(Ordinal ordinal) { + return find(o -> o.compareTo(ordinal) < 0); } /** * Returns the first Sequence from the group that has its timestamp * greater than the given argument alongside its position in the list. */ - public Tuple after(long timestamp, Comparable tiebreaker) { + public Tuple after(Ordinal ordinal) { + return find(o -> o.compareTo(ordinal) > 0); + } + + private Tuple find(Predicate predicate) { Sequence matchSeq = null; int matchPos = -1; int position = -1; for (Sequence sequence : sequences) { position++; - // ts only comparison - if (sequence.currentTimestamp() > timestamp) { + if (predicate.test(sequence.ordinal())) { matchSeq = sequence; matchPos = position; - } - // apply tiebreaker (null first, that is null is less than any value) - else if (tiebreaker != null && sequence.currentTimestamp() == timestamp) { - Comparable tb = sequence.currentTiebreaker(); - if (tb == null || tb.compareTo(tiebreaker) > 0) { - matchSeq = sequence; - matchPos = position; - } } else { break; } @@ -112,9 +81,9 @@ public void trim(int position) { // update min time if (sequences.isEmpty() == false) { - min = sequences.get(0).currentTimestamp(); + start = sequences.get(0).ordinal(); } else { - min = Long.MAX_VALUE; + stop = null; } } @@ -124,6 +93,6 @@ public List sequences() { @Override public String toString() { - return format(null, "[{}-{}]({} seqs)", tBegin, tEnd, sequences.size()); + return format(null, "[{}-{}]({} seqs)", start, stop, sequences.size()); } } \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java index df4ff63576ac1..df97f24f4f5fe 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceStateMachine.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.execution.sequence; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -24,33 +25,25 @@ public class SequenceStateMachine { /** Current keys on each stage */ private final StageToKeys stageToKeys; - /** minimum timestamp per stage */ - /** this ignores the key */ - private final long[] timestampMarkers; - - private final Comparable[] tiebreakerMarkers; - private final boolean hasTieBreaker; - private final int completionStage; /** list of completed sequences - separate to avoid polluting the other stages */ private final List completed; - + private final long maxSpanInMillis; + private int offset = 0; private int limit = -1; private boolean limitReached = false; @SuppressWarnings("rawtypes") - public SequenceStateMachine(int stages, boolean hasTiebreaker, Limit limit) { + public SequenceStateMachine(int stages, TimeValue maxSpan, Limit limit) { this.completionStage = stages - 1; this.stageToKeys = new StageToKeys(completionStage); this.keyToSequences = new KeyToSequences(completionStage); - this.timestampMarkers = new long[completionStage]; - this.tiebreakerMarkers = new Comparable[completionStage]; this.completed = new LinkedList<>(); - this.hasTieBreaker = hasTiebreaker; + this.maxSpanInMillis = maxSpan.millis(); // limit && offset if (limit != null) { @@ -63,34 +56,11 @@ public List completeSequences() { return completed; } - public long getTimestampMarker(int stage) { - return timestampMarkers[stage]; - } - - public Comparable getTiebreakerMarker(int stage) { - return tiebreakerMarkers[stage]; - } - - public void setTimestampMarker(int stage, long timestamp) { - timestampMarkers[stage] = timestamp; - } - - public void setTiebreakerMarker(int stage, Comparable tiebreaker) { - tiebreakerMarkers[stage] = tiebreaker; - } - - public Object[] getMarkers(int stage) { - long ts = timestampMarkers[stage]; - Comparable tb = tiebreakerMarkers[stage]; - return hasTieBreaker ? new Object[] { ts, tb } : new Object[] { ts }; - } - - public void trackSequence(Sequence sequence, long tStart, long tStop) { + public void trackSequence(Sequence sequence) { SequenceKey key = sequence.key(); stageToKeys.keys(0).add(key); SequenceFrame frame = keyToSequences.frame(0, key); - frame.setTimeFrame(tStart, tStop); frame.add(sequence); } @@ -98,23 +68,27 @@ public void trackSequence(Sequence sequence, long tStart, long tStop) { * Match the given hit (based on key and timestamp and potential tiebreaker) with any potential sequence from the previous * given stage. If that's the case, update the sequence and the rest of the references. */ - public boolean match(int stage, SequenceKey key, long timestamp, Comparable tiebreaker, SearchHit hit) { + public boolean match(int stage, SequenceKey key, Ordinal ordinal, SearchHit hit) { int previousStage = stage - 1; // check key presence to avoid creating a collection SequenceFrame frame = keyToSequences.frameIfPresent(previousStage, key); if (frame == null || frame.isEmpty()) { return false; } - // pick the sequence with the highest (for ASC) / lowest (for DESC) timestamp lower than current match timestamp - Tuple neighbour = frame.before(timestamp, tiebreaker); - if (neighbour == null) { + Tuple before = frame.before(ordinal); + if (before == null) { return false; } - Sequence sequence = neighbour.v1(); + Sequence sequence = before.v1(); // eliminate the match and all previous values from the frame - frame.trim(neighbour.v2() + 1); - // update sequence - sequence.putMatch(stage, hit, timestamp, tiebreaker); + frame.trim(before.v2() + 1); + + // check maxspan before continuing the sequence + if (maxSpanInMillis > 0 && (ordinal.timestamp - sequence.startTimestamp() >= maxSpanInMillis)) { + return false; + } + + sequence.putMatch(stage, hit, ordinal); // remove the frame and keys early (as the key space is large) if (frame.isEmpty()) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java index 8c3dc31193fd4..8130e36312df4 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/SequenceExec.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.eql.plan.physical; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.eql.EqlIllegalArgumentException; import org.elasticsearch.xpack.eql.execution.assembler.ExecutionManager; import org.elasticsearch.xpack.eql.execution.search.Limit; @@ -33,6 +34,7 @@ public class SequenceExec extends PhysicalPlan { private final Attribute tiebreaker; private final Limit limit; private final OrderDirection direction; + private final TimeValue maxSpan; public SequenceExec(Source source, List> keys, @@ -41,8 +43,9 @@ public SequenceExec(Source source, PhysicalPlan until, Attribute timestamp, Attribute tiebreaker, - OrderDirection direction) { - this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction); + OrderDirection direction, + TimeValue maxSpan) { + this(source, combine(matches, until), combine(keys, singletonList(untilKeys)), timestamp, tiebreaker, null, direction, maxSpan); } private SequenceExec(Source source, @@ -51,18 +54,20 @@ private SequenceExec(Source source, Attribute ts, Attribute tb, Limit limit, - OrderDirection direction) { + OrderDirection direction, + TimeValue maxSpan) { super(source, children); this.keys = keys; this.timestamp = ts; this.tiebreaker = tb; this.limit = limit; this.direction = direction; + this.maxSpan = maxSpan; } @Override protected NodeInfo info() { - return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction); + return NodeInfo.create(this, SequenceExec::new, children(), keys, timestamp, tiebreaker, limit, direction, maxSpan); } @Override @@ -72,7 +77,7 @@ public PhysicalPlan replaceChildren(List newChildren) { children().size(), newChildren.size()); } - return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction); + return new SequenceExec(source(), newChildren, keys, timestamp, tiebreaker, limit, direction, maxSpan); } @Override @@ -109,12 +114,13 @@ public OrderDirection direction() { } public SequenceExec with(Limit limit) { - return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction); + return new SequenceExec(source(), children(), keys(), timestamp(), tiebreaker(), limit, direction, maxSpan); } @Override public void execute(EqlSession session, ActionListener listener) { - new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, limit()).execute(listener); + new ExecutionManager(session).assemble(keys(), children(), timestamp(), tiebreaker(), direction, maxSpan, limit()).execute( + listener); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java index 1b236bebe30ab..5cfae03bd0d53 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java @@ -78,7 +78,7 @@ protected PhysicalPlan map(LogicalPlan p) { map(s.until().child()), s.timestamp(), s.tiebreaker(), - s.direction()); + s.direction(), s.maxSpan()); } if (p instanceof LocalRelation) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java index 3f4238accc759..22e8405e5f9ef 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyPayload.java @@ -36,11 +36,6 @@ public TimeValue timeTook() { return TimeValue.ZERO; } - @Override - public Object[] nextKeys() { - return null; - } - @Override public List values() { return emptyList(); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java index 2791e582974e3..920b11465259a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Payload.java @@ -22,7 +22,5 @@ public interface Payload { TimeValue timeTook(); - Object[] nextKeys(); - List values(); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java index 99fa6c8bfdcd7..b48b0f0853294 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/assembler/SequenceRuntimeTests.java @@ -80,7 +80,7 @@ class TestCriterion extends Criterion { private final int ordinal; TestCriterion(int ordinal) { - super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor); + super(SearchSourceBuilder.searchSource().size(ordinal), keyExtractors, tsExtractor, tbExtractor, false); this.ordinal = ordinal; } @@ -144,11 +144,6 @@ public TimeValue timeTook() { return TimeValue.ZERO; } - @Override - public Object[] nextKeys() { - return new Object[0]; - } - @SuppressWarnings("unchecked") @Override public List values() { @@ -191,7 +186,7 @@ public void test() { SequenceRuntime runtime = new SequenceRuntime(criteria, (r, l) -> { Map> evs = events.get(r.searchSource().size()); l.onResponse(new TestPayload(evs)); - }, false, null); + }, TimeValue.MINUS_ONE, null); // finally make the assertion at the end of the listener runtime.execute(wrap(this::checkResults, ex -> { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java index 9e35ed27f4771..cb081954414dc 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/parser/LogicalPlanTests.java @@ -145,6 +145,5 @@ public void testSequencePlan() { TimeValue maxSpan = seq.maxSpan(); assertEquals(new TimeValue(2, TimeUnit.SECONDS), maxSpan); - } -} +} \ No newline at end of file