Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
Expand All @@ -20,7 +21,10 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleParams;
import org.elasticsearch.xpack.core.ml.job.config.RuleParamsForForceTimeShift;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
Expand All @@ -39,7 +43,9 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.oneOf;

/**
Expand Down Expand Up @@ -299,6 +305,67 @@ public void testScopeAndCondition() throws IOException {
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
}

public void testForceTimeShiftAction() throws Exception {
// The test ensures that the force time shift action works as expected.

long timeShiftAmount = 3600L;
long timestampStartMillis = 1491004800000L;
long bucketSpanMillis = 3600000L;
long timeShiftTimestamp = (timestampStartMillis + bucketSpanMillis) / 1000;

int totalBuckets = 2 * 24;

DetectionRule rule = new DetectionRule.Builder(
Arrays.asList(new RuleCondition(RuleCondition.AppliesTo.TIME, Operator.GTE, timeShiftTimestamp))
).setActions(RuleAction.FORCE_TIME_SHIFT).setParams(new RuleParams(new RuleParamsForForceTimeShift(timeShiftAmount))).build();

Detector.Builder detector = new Detector.Builder("mean", "value");
detector.setRules(Arrays.asList(rule));
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueMillis(bucketSpanMillis));
DataDescription.Builder dataDescription = new DataDescription.Builder();
Job.Builder job = new Job.Builder("detection-rules-it-test-force-time-shift");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);

putJob(job);
openJob(job.getId());

// post some data
int normalValue = 400;
List<String> data = new ArrayList<>();
long timestamp = timestampStartMillis;
for (int bucket = 0; bucket < totalBuckets; bucket++) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
record.put("value", normalValue);
data.add(createJsonRecord(record));
timestamp += bucketSpanMillis;
}

postData(job.getId(), joinBetween(0, data.size(), data));
closeJob(job.getId());

List<Annotation> annotations = getAnnotations();
assertThat(annotations.size(), greaterThanOrEqualTo(1));
assertThat(annotations.size(), lessThanOrEqualTo(3));

// Check that annotation contain the expected time shift
boolean countingModelAnnotationFound = false;
boolean individualModelAnnotationFound = false;
for (Annotation annotation : annotations) {
if (annotation.getAnnotation().contains("Counting model shifted time by")) {
countingModelAnnotationFound = true;
assertThat(annotation.getAnnotation(), containsString(timeShiftAmount + " seconds"));
} else if (annotation.getAnnotation().contains("Model shifted time by")) {
individualModelAnnotationFound = true;
assertThat(annotation.getAnnotation(), containsString(timeShiftAmount + " seconds"));
}
}
assertThat("Counting model annotation with time shift not found", countingModelAnnotationFound, equalTo(true));
assertThat("Individual model annotation with time shift not found", individualModelAnnotationFound, equalTo(true));
}

private String createIpRecord(long timestamp, String ip) throws IOException {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,27 @@ protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnota
});
}

protected List<Annotation> getAnnotations() throws Exception {
List<Annotation> annotations = new ArrayList<>();
// Refresh the annotations index so that recently indexed annotation docs are visible.
indicesAdmin().prepareRefresh(AnnotationIndex.LATEST_INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.get();

SearchRequest searchRequest = new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(
IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN
);
assertCheckedResponse(client().search(searchRequest), searchResponse -> {

for (SearchHit hit : searchResponse.getHits().getHits()) {
try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
annotations.add(Annotation.fromXContent(parser, null));
}
}
});
return annotations;
}

protected ForecastRequestStats getForecastStats(String jobId, String forecastId) throws Exception {
SetOnce<ForecastRequestStats> forecastRequestStats = new SetOnce<>();
assertCheckedResponse(
Expand Down