Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
public static final ParseField LEVEL = new ParseField("level");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField NODE_NAME = new ParseField("node_name");
public static final ParseField JOB_TYPE = new ParseField("job_type");

protected static final <T extends AbstractAuditMessage> ConstructingObjectParser<T, Void> createParser(
String name, AbstractAuditMessageFactory<T> messageFactory, ParseField resourceField) {
Expand Down Expand Up @@ -99,13 +100,17 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (nodeName != null) {
builder.field(NODE_NAME.getPreferredName(), nodeName);
}
String jobType = getJobType();
if (jobType != null) {
builder.field(JOB_TYPE.getPreferredName(), jobType);
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(resourceId, message, level, timestamp, nodeName);
return Objects.hash(resourceId, message, level, timestamp, nodeName, getJobType());
}

@Override
Expand All @@ -122,8 +127,17 @@ public boolean equals(Object obj) {
Objects.equals(message, other.message) &&
Objects.equals(level, other.level) &&
Objects.equals(timestamp, other.timestamp) &&
Objects.equals(nodeName, other.nodeName);
Objects.equals(nodeName, other.nodeName) &&
Objects.equals(getJobType(), other.getJobType());
}

/**
* @return job type string used to tell apart jobs of different types stored in the same index
*/
public abstract String getJobType();

/**
* @return resource id field name used when storing a new message
*/
protected abstract String getResourceField();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public DataFrameAuditMessage(String resourceId, String message, Level level, Dat
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return null;
}

@Override
protected String getResourceField() {
return TRANSFORM_ID.getPreferredName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,12 +1121,13 @@ public static XContentBuilder auditMessageMapping() throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
Expand All @@ -1142,6 +1143,9 @@ public static XContentBuilder auditMessageMapping() throws IOException {
.startObject(AnomalyDetectionAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.JOB_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public AnomalyDetectionAuditMessage(String resourceId, String message, Level lev
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return Job.ANOMALY_DETECTOR_JOB_TYPE;
}

@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core.ml.notifications;

public final class AuditorField {
public static final String NOTIFICATIONS_INDEX = ".ml-notifications";

private AuditorField() {}
public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000001";

private AuditorField() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {

private static final ParseField JOB_ID = Job.ID;
public static final ConstructingObjectParser<DataFrameAnalyticsAuditMessage, Void> PARSER =
createParser("ml_analytics_audit_message", DataFrameAnalyticsAuditMessage::new, JOB_ID);

public DataFrameAnalyticsAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return "data_frame_analytics";
}

@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ static class TestAuditMessage extends AbstractAuditMessage {
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public String getJobType() {
return "test_type";
}

@Override
protected String getResourceField() {
return TEST_ID.getPreferredName();
Expand All @@ -42,6 +47,11 @@ public void testGetResourceField() {
assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
}

public void testGetJobType() {
TestAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("test_type"));
}

public void testNewInfo() {
TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
assertThat(message.getResourceId(), equalTo(RESOURCE_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@

import java.util.Date;

import static org.hamcrest.Matchers.nullValue;

public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {

public void testGetJobType() {
DataFrameAuditMessage message = createTestInstance();
assertThat(message.getJobType(), nullValue());
}

@Override
protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAuditMessage.PARSER.apply(parser, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

import static org.hamcrest.Matchers.equalTo;

public class AnomalyDetectionAuditMessageTests extends AbstractXContentTestCase<AnomalyDetectionAuditMessage> {

public void testGetJobType() {
AnomalyDetectionAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo(Job.ANOMALY_DETECTOR_JOB_TYPE));
}

@Override
protected AnomalyDetectionAuditMessage doParseInstance(XContentParser parser) {
return AnomalyDetectionAuditMessage.PARSER.apply(parser, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;

import java.util.Date;

import static org.hamcrest.Matchers.equalTo;

public class DataFrameAnalyticsAuditMessageTests extends AbstractXContentTestCase<DataFrameAnalyticsAuditMessage> {

public void testGetJobType() {
DataFrameAnalyticsAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("data_frame_analytics"));
}

@Override
protected DataFrameAnalyticsAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAnalyticsAuditMessage.PARSER.apply(parser, null);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected DataFrameAnalyticsAuditMessage createTestInstance() {
return new DataFrameAnalyticsAuditMessage(
randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -183,7 +184,8 @@ public void testDeleteExpiredData() throws Exception {
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L));

Expand Down Expand Up @@ -233,7 +235,8 @@ public void testDeleteExpiredData() throws Exception {
long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;

import java.io.IOException;
Expand Down Expand Up @@ -186,7 +187,8 @@ public void testScope() throws Exception {

// Wait until the notification that the filter was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;

import java.io.IOException;
Expand Down Expand Up @@ -223,7 +224,8 @@ public void testAddEventsToOpenJob() throws Exception {

// Wait until the notification that the process was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down Expand Up @@ -298,7 +300,8 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception {

// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.DummyController;
import org.elasticsearch.xpack.ml.process.MlController;
import org.elasticsearch.xpack.ml.process.MlControllerHolder;
Expand Down Expand Up @@ -471,6 +472,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}

AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
Expand Down Expand Up @@ -593,6 +595,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobDataCountsPersister,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
memoryTracker,
analyticsProcessManager,
Expand Down Expand Up @@ -898,8 +901,12 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat

public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
List<String> templateNames =
Arrays.asList(
AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.notifications;

import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

public class DataFrameAnalyticsAuditor extends AbstractAuditor<DataFrameAnalyticsAuditMessage> {

public DataFrameAnalyticsAuditor(Client client, String nodeName) {
super(client, nodeName, AuditorField.NOTIFICATIONS_INDEX, ML_ORIGIN, DataFrameAnalyticsAuditMessage::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testCreatedWhenAfterOtherMlIndex() throws Exception {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "node_1");
auditor.info("whatever", "blah");

// Creating a document in the .ml-notifications index should cause .ml-annotations
// Creating a document in the .ml-notifications-000001 index should cause .ml-annotations
// to be created, as it should get created as soon as any other ML index exists

assertBusy(() -> {
Expand Down
Loading