diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 46793793622d4..5f4b941b46da9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -469,7 +470,7 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { builder.setCustomSettings(customSettings); } if (modelSnapshotId != null) { - builder.setModelSnapshotId(modelSnapshotId); + builder.setModelSnapshotId(ModelSnapshot.isTheEmptySnapshot(modelSnapshotId) ? null : modelSnapshotId); } if (modelSnapshotMinVersion != null) { builder.setModelSnapshotMinVersion(modelSnapshotMinVersion); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java index 80b4eab384e41..7a978aae6d68d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java @@ -80,6 +80,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie return parser; } + private static String EMPTY_SNAPSHOT_ID = "empty"; private final String jobId; @@ -285,6 +286,14 @@ public List stateDocumentIds() { return stateDocumentIds; } + public boolean isTheEmptySnapshot() { + return isTheEmptySnapshot(snapshotId); + } + + public static boolean isTheEmptySnapshot(String snapshotId) { + return EMPTY_SNAPSHOT_ID.equals(snapshotId); + } + public static String documentIdPrefix(String jobId) { return jobId + "_" + TYPE + "_"; } @@ -435,4 +444,9 @@ public ModelSnapshot build() { latestRecordTimeStamp, latestResultTimeStamp, quantiles, retain); } } + + public static ModelSnapshot emptySnapshot(String jobId) { + return new ModelSnapshot(jobId, Version.CURRENT, new Date(), "empty snapshot", EMPTY_SNAPSHOT_ID, 0, + new ModelSizeStats.Builder(jobId).build(), null, null, null, false); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 105f068f38d48..3c787529bbfb3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import java.util.ArrayList; import java.util.Arrays; @@ -26,7 +27,9 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; public class JobUpdateTests extends AbstractSerializingTestCase { @@ -369,4 +372,23 @@ public void testUpdate_withAnalysisLimitsPreviouslyUndefined() { updateAboveMaxLimit.mergeWithJob(jobBuilder.build(), new ByteSizeValue(10000L, ByteSizeUnit.MB)); } + + public void testUpdate_givenEmptySnapshot() { + Job.Builder jobBuilder = new Job.Builder("my_job"); + Detector.Builder d1 = new Detector.Builder("count", null); + AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build())); + jobBuilder.setAnalysisConfig(ac); + jobBuilder.setDataDescription(new DataDescription.Builder()); + jobBuilder.setCreateTime(new Date()); + jobBuilder.setModelSnapshotId("some_snapshot_id"); + Job job = jobBuilder.build(); + assertThat(job.getModelSnapshotId(), equalTo("some_snapshot_id")); + + JobUpdate update = new JobUpdate.Builder(job.getId()) + .setModelSnapshotId(ModelSnapshot.emptySnapshot(job.getId()).getSnapshotId()) + .build(); + + Job updatedJob = update.mergeWithJob(job, ByteSizeValue.ofMb(100)); + assertThat(updatedJob.getModelSnapshotId(), is(nullValue())); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java index 175e1bf2d4661..7832e598d3e92 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java @@ -18,6 +18,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class ModelSnapshotTests extends AbstractSerializingTestCase { private static final Date DEFAULT_TIMESTAMP = new Date(); @@ -155,7 +157,7 @@ public static ModelSnapshot createRandomized() { modelSnapshot.setMinVersion(Version.CURRENT); modelSnapshot.setTimestamp(new Date(TimeValue.parseTimeValue(randomTimeValue(), "test").millis())); modelSnapshot.setDescription(randomAlphaOfLengthBetween(1, 20)); - modelSnapshot.setSnapshotId(randomAlphaOfLengthBetween(1, 20)); + modelSnapshot.setSnapshotId(randomAlphaOfLength(10)); modelSnapshot.setSnapshotDocCount(randomInt()); modelSnapshot.setModelSizeStats(ModelSizeStatsTests.createRandomized()); modelSnapshot.setLatestResultTimeStamp( @@ -214,4 +216,18 @@ public void testLenientParser() throws IOException { ModelSnapshot.LENIENT_PARSER.apply(parser, null); } } + + public void testEmptySnapshot() { + ModelSnapshot modelSnapshot = ModelSnapshot.emptySnapshot("my_job"); + assertThat(modelSnapshot.getSnapshotId(), equalTo("empty")); + assertThat(modelSnapshot.isTheEmptySnapshot(), is(true)); + assertThat(modelSnapshot.getMinVersion(), equalTo(Version.CURRENT)); + assertThat(modelSnapshot.getLatestRecordTimeStamp(), is(nullValue())); + assertThat(modelSnapshot.getLatestResultTimeStamp(), is(nullValue())); + } + + public void testIsEmpty_GivenNonEmptySnapshot() { + ModelSnapshot modelSnapshot = createRandomized(); + assertThat(modelSnapshot.isTheEmptySnapshot(), is(false)); + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index ce84789db534b..af7672a8d14d5 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.Annotation.Event; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; @@ -25,9 +26,11 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.security.user.XPackUser; import org.junit.After; @@ -49,6 +52,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; /** * This test pushes data through a job in 2 runs creating @@ -58,19 +63,60 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { @After - public void tearDownData() throws Exception { + public void tearDownData() { cleanUp(); } public void testRevertModelSnapshot() throws Exception { - test("revert-model-snapshot-it-job", false); + testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion("revert-model-snapshot-it-job", false); } public void testRevertModelSnapshot_DeleteInterveningResults() throws Exception { - test("revert-model-snapshot-it-job-delete-intervening-results", true); + testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion("revert-model-snapshot-it-job-delete-intervening-results", true); } - private void test(String jobId, boolean deleteInterveningResults) throws Exception { + public void testRevertToEmptySnapshot() throws Exception { + String jobId = "revert-to-empty-snapshot-test"; + + TimeValue bucketSpan = TimeValue.timeValueHours(1); + long startTime = 1491004800000L; + + String data = generateData(startTime, bucketSpan, 20, Arrays.asList("foo"), + (bucketIndex, series) -> bucketIndex == 19 ? 100.0 : 10.0).stream().collect(Collectors.joining()); + + Job.Builder job = buildAndRegisterJob(jobId, bucketSpan); + openJob(job.getId()); + postData(job.getId(), data); + flushJob(job.getId(), true); + closeJob(job.getId()); + + assertThat(getJob(jobId).get(0).getModelSnapshotId(), is(notNullValue())); + List expectedBuckets = getBuckets(jobId); + assertThat(expectedBuckets.size(), equalTo(20)); + List expectedRecords = getRecords(jobId); + assertThat(expectedBuckets.isEmpty(), is(false)); + assertThat(expectedRecords.isEmpty(), is(false)); + + RevertModelSnapshotAction.Response revertResponse = revertModelSnapshot(jobId, "empty", true); + assertThat(revertResponse.getModel().getSnapshotId(), equalTo("empty")); + + assertThat(getJob(jobId).get(0).getModelSnapshotId(), is(nullValue())); + assertThat(getBuckets(jobId).isEmpty(), is(true)); + assertThat(getRecords(jobId).isEmpty(), is(true)); + assertThat(getJobStats(jobId).get(0).getDataCounts().getLatestRecordTimeStamp(), is(nullValue())); + + // Now run again and see we get same results + openJob(job.getId()); + DataCounts dataCounts = postData(job.getId(), data); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + flushJob(job.getId(), true); + closeJob(job.getId()); + + assertThat(getBuckets(jobId).size(), equalTo(expectedBuckets.size())); + assertThat(getRecords(jobId), equalTo(expectedRecords)); + } + + private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobId, boolean deleteInterveningResults) throws Exception { TimeValue bucketSpan = TimeValue.timeValueHours(1); long startTime = 1491004800000L; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 8ec231b87a2cd..8c09eccaa5e56 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -758,7 +758,7 @@ private Optional getQuantiles() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobResultsProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(), params -> { + jobResultsProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).setModelSnapshotId("test_snapshot").build(), params -> { resultHolder.set(Optional.ofNullable(params.quantiles())); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index b2d7585421f7f..c896ded6a0090 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -127,22 +127,31 @@ private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResu Consumer errorHandler) { logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'"); + if (ModelSnapshot.isTheEmptySnapshot(request.getSnapshotId())) { + handler.accept(ModelSnapshot.emptySnapshot(request.getJobId())); + return; + } + provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> { if (modelSnapshot == null) { - throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), - request.getJobId())); + throw missingSnapshotException(request); } handler.accept(modelSnapshot.result); }, errorHandler); } + private static ResourceNotFoundException missingSnapshotException(RevertModelSnapshotAction.Request request) { + return new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), + request.getJobId())); + } + private ActionListener wrapDeleteOldAnnotationsListener( ActionListener listener, ModelSnapshot modelSnapshot, String jobId) { return ActionListener.wrap(response -> { - Date deleteAfter = modelSnapshot.getLatestResultTimeStamp(); + Date deleteAfter = modelSnapshot.getLatestResultTimeStamp() == null ? new Date(0) : modelSnapshot.getLatestResultTimeStamp(); logger.info("[{}] Removing intervening annotations after reverting model: deleting annotations after [{}]", jobId, deleteAfter); JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); @@ -176,7 +185,7 @@ private ActionListener wrapDeleteOldDataList // wrap the listener with one that invokes the OldDataRemover on // acknowledged responses return ActionListener.wrap(response -> { - Date deleteAfter = modelSnapshot.getLatestResultTimeStamp(); + Date deleteAfter = modelSnapshot.getLatestResultTimeStamp() == null ? new Date(0) : modelSnapshot.getLatestResultTimeStamp(); logger.info("[{}] Removing intervening records after reverting model: deleting results after [{}]", jobId, deleteAfter); JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index a894c7cb9307f..88c0e8f85134a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -428,7 +428,7 @@ private void validate(Job job, JobUpdate jobUpdate, ActionListener handler } private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, VoidChainTaskExecutor voidChainTaskExecutor) { - if (modelSnapshotId != null) { + if (modelSnapshotId != null && ModelSnapshot.isTheEmptySnapshot(modelSnapshotId) == false) { voidChainTaskExecutor.add(listener -> { jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> { if (newModelSnapshot == null) { @@ -599,6 +599,11 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 3. After the model size stats is persisted, also persist the snapshot's quantiles and respond // ------- CheckedConsumer modelSizeStatsResponseHandler = response -> { + // In case we are reverting to the empty snapshot the quantiles will be null + if (modelSnapshot.getQuantiles() == null) { + actionListener.onResponse(new RevertModelSnapshotAction.Response(modelSnapshot)); + return; + } jobResultsPersister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(quantilesResponse -> { // The quantiles can be large, and totally dominate the output - diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 6e7a81c67ce60..51aab3c86ab67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -585,11 +585,12 @@ public void getAutodetectParams(Job job, String snapshotId, Consumer