Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ private static void setupData(CommonEqlActionTestCase tc) throws Exception {
for (Object item : list) {
assertThat(item, instanceOf(HashMap.class));
Map<String, Object> entry = (Map<String, Object>) item;
Long ts = (Long) entry.get("timestamp");
// currently this is windows filetime
entry.put("@timestamp", winFileTimeToUnix(ts));
bulk.add(new IndexRequest(testIndexName).source(entry, XContentType.JSON));
}
}
Expand All @@ -89,6 +92,14 @@ private static void setupData(CommonEqlActionTestCase tc) throws Exception {
assertFalse(bulkResponse.hasFailures());
isSetUp = true;
}

}
private static final long FILETIME_EPOCH_DIFF = 11644473600000L;
private static final long FILETIME_ONE_MILLISECOND = 10 * 1000;

public static long winFileTimeToUnix(final long filetime) {
long ts = (filetime / FILETIME_ONE_MILLISECOND);
return ts - FILETIME_EPOCH_DIFF;
}

private static void cleanupData(CommonEqlActionTestCase tc) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
"type" : "date"
},
"@timestamp" : {
"type" : "alias",
"path" : "timestamp"
"type" : "date"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because timestamp is not unix time but rather window filetime (see this comment and until the dataset gets updated, working on a separate field is the cleaner.

},
"user" : {
"type" : "keyword"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]

[[queries]]
query = '''
sequence with maxspan=0.5s
sequence with maxspan=500ms
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,62 +224,6 @@ sequence
'''
expected_event_ids = [1, 2, 2, 3]


[[queries]]
query = '''
sequence with maxspan=1d
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]

[[queries]]
query = '''
sequence with maxspan=1h
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]

[[queries]]
query = '''
sequence with maxspan=1m
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]

[[queries]]
query = '''
sequence with maxspan=10s
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = [67, 68, 69, 70, 72, 73, 74, 75]

[[queries]]
query = '''
sequence with maxspan=0.5s
[file where event_subtype_full == "file_create_event"] by file_path
[process where opcode == 1] by process_path
[process where opcode == 2] by process_path
[file where event_subtype_full == "file_delete_event"] by file_path
| head 4
| tail 2'''
expected_event_ids = []

[[queries]]
query = '''
sequence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.QueryRequest;
import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.util.ReversedIterator;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;

import java.util.List;
Expand All @@ -22,19 +24,25 @@ public class Criterion implements QueryRequest {
private final HitExtractor tiebreakerExtractor;

// search after markers
private Object[] startMarker;
private Object[] stopMarker;
private Ordinal startMarker;
private Ordinal stopMarker;

private boolean reverse;

//TODO: should accept QueryRequest instead of another SearchSourceBuilder
public Criterion(SearchSourceBuilder searchSource, List<HitExtractor> searchAfterExractors, HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor) {
public Criterion(SearchSourceBuilder searchSource,
List<HitExtractor> searchAfterExractors,
HitExtractor timestampExtractor,
HitExtractor tiebreakerExtractor,
boolean reverse) {
this.searchSource = searchSource;
this.keyExtractors = searchAfterExractors;
this.timestampExtractor = timestampExtractor;
this.tiebreakerExtractor = tiebreakerExtractor;

this.startMarker = null;
this.stopMarker = null;
this.reverse = reverse;
}

@Override
Expand All @@ -54,54 +62,45 @@ public HitExtractor tiebreakerExtractor() {
return tiebreakerExtractor;
}

public long timestamp(SearchHit hit) {
@SuppressWarnings({ "unchecked" })
public Ordinal ordinal(SearchHit hit) {

Object ts = timestampExtractor.extract(hit);
if (ts instanceof Number) {
return ((Number) ts).longValue();
if (ts instanceof Number == false) {
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}
throw new EqlIllegalArgumentException("Expected timestamp as long but got {}", ts);
}

@SuppressWarnings({ "unchecked" })
public Comparable<Object> tiebreaker(SearchHit hit) {
if (tiebreakerExtractor == null) {
return null;
}
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable) {
return (Comparable<Object>) tb;
}
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
long timestamp = ((Number) ts).longValue();
Comparable<Object> tiebreaker = null;

public Object[] startMarker() {
return startMarker;
if (tiebreakerExtractor != null) {
Object tb = tiebreakerExtractor.extract(hit);
if (tb instanceof Comparable == false) {
throw new EqlIllegalArgumentException("Expected tiebreaker to be Comparable but got {}", tb);
}
tiebreaker = (Comparable<Object>) tb;
}
return new Ordinal(timestamp, tiebreaker);
}

public Object[] stopMarker() {
return stopMarker;
public void startMarker(Ordinal ordinal) {
startMarker = ordinal;
}

private Object[] marker(SearchHit hit) {
long timestamp = timestamp(hit);
Object tiebreaker = null;
if (tiebreakerExtractor() != null) {
tiebreaker = tiebreaker(hit);
}

return tiebreaker != null ? new Object[] { timestamp, tiebreaker } : new Object[] { timestamp };
public void stopMarker(Ordinal ordinal) {
stopMarker = ordinal;
}

public void startMarker(SearchHit hit) {
startMarker = marker(hit);
public Ordinal nextMarker() {
return startMarker.compareTo(stopMarker) < 1 ? startMarker : stopMarker;
}

public void stopMarker(SearchHit hit) {
stopMarker = marker(hit);
public Criterion useMarker(Ordinal marker) {
searchSource.searchAfter(marker.toArray());
return this;
}

public Criterion useMarker(Object[] marker) {
searchSource.searchAfter(marker);
return this;
public Iterable<SearchHit> iterateable(List<SearchHit> hits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterable sounds better.

return () -> reverse ? new ReversedIterator<>(hits) : hits.iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.BasicQueryClient;
import org.elasticsearch.xpack.eql.execution.search.Limit;
Expand Down Expand Up @@ -43,11 +44,14 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
Attribute timestamp,
Attribute tiebreaker,
OrderDirection direction,
TimeValue maxSpan,
Limit limit) {
FieldExtractorRegistry extractorRegistry = new FieldExtractorRegistry();

List<Criterion> criteria = new ArrayList<>(plans.size() - 1);

boolean descending = direction == OrderDirection.DESC;

// build a criterion for each query
for (int i = 0; i < plans.size() - 1; i++) {
List<Attribute> keys = listOfKeys.get(i);
Expand All @@ -61,9 +65,10 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
// TODO: this could be generalized into an exec only query
Check.isTrue(query instanceof EsQueryExec, "Expected a query but got [{}]", query.getClass());
QueryRequest request = ((EsQueryExec) query).queryRequest(session);
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor));
// base query remains descending, the rest need to flip
criteria.add(new Criterion(request.searchSource(), keyExtractors, tsExtractor, tbExtractor, i > 0 && descending));
}
return new SequenceRuntime(criteria, new BasicQueryClient(session), direction == OrderDirection.DESC, limit);
return new SequenceRuntime(criteria, new BasicQueryClient(session), maxSpan, limit);
}

private HitExtractor timestampExtractor(HitExtractor hitExtractor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

package org.elasticsearch.xpack.eql.execution.assembler;

import org.elasticsearch.xpack.eql.execution.sequence.Ordinal;
import org.elasticsearch.xpack.eql.execution.sequence.SequenceKey;

import java.util.Objects;

class KeyAndOrdinal {
final SequenceKey key;
final long timestamp;
final Comparable<Object> tiebreaker;
final Ordinal ordinal;

KeyAndOrdinal(SequenceKey key, long timestamp, Comparable<Object> tiebreaker) {
KeyAndOrdinal(SequenceKey key, Ordinal ordinal) {
this.key = key;
this.timestamp = timestamp;
this.tiebreaker = tiebreaker;
this.ordinal = ordinal;
}

@Override
public int hashCode() {
return Objects.hash(key, timestamp, tiebreaker);
return Objects.hash(key, ordinal);
}

@Override
Expand All @@ -38,12 +37,11 @@ public boolean equals(Object obj) {

KeyAndOrdinal other = (KeyAndOrdinal) obj;
return Objects.equals(key, other.key)
&& Objects.equals(timestamp, other.timestamp)
&& Objects.equals(tiebreaker, other.tiebreaker);
&& Objects.equals(ordinal, other.ordinal);
}

@Override
public String toString() {
return key + "[" + timestamp + "][" + (tiebreaker != null ? Objects.toString(tiebreaker) : "") + "]";
return key.toString() + ordinal.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
import org.elasticsearch.xpack.eql.session.Results.Type;
import org.elasticsearch.xpack.eql.util.ReversedIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

class SequencePayload extends AbstractPayload {

private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> sequences;

SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook, Object[] nextKeys) {
super(timedOut, timeTook, nextKeys);
SequencePayload(List<Sequence> seq, boolean timedOut, TimeValue timeTook) {
super(timedOut, timeTook);
sequences = new ArrayList<>(seq.size());
for (Sequence s : seq) {
boolean needsReversal = seq.size() > 1 && (seq.get(0).ordinal().compareTo(seq.get(1).ordinal()) > 0);

for (Iterator<Sequence> it = needsReversal ? new ReversedIterator<>(seq) : seq.iterator(); it.hasNext();) {
Sequence s = it.next();
sequences.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asStringList(), s.hits()));
}
}
Expand Down
Loading