Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/migration/migrate_8_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ coming[8.0.0]
* <<breaking_80_discovery_changes>>
* <<breaking_80_mappings_changes>>
* <<breaking_80_packaging_changes>>
* <<breaking_80_rollup_changes>>
* <<breaking_80_snapshots_changes>>
* <<breaking_80_security_changes>>
* <<breaking_80_ilm_changes>>
Expand Down Expand Up @@ -49,6 +50,7 @@ include::migrate_8_0/analysis.asciidoc[]
include::migrate_8_0/discovery.asciidoc[]
include::migrate_8_0/mappings.asciidoc[]
include::migrate_8_0/packaging.asciidoc[]
include::migrate_8_0/rollup.asciidoc[]
include::migrate_8_0/snapshots.asciidoc[]
include::migrate_8_0/security.asciidoc[]
include::migrate_8_0/ilm.asciidoc[]
Expand Down
20 changes: 20 additions & 0 deletions docs/reference/migration/migrate_8_0/rollup.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[float]
[[breaking_80_rollup_changes]]
=== Rollup changes

//NOTE: The notable-breaking-changes tagged regions are re-used in the
//Installation and Upgrade Guide

//tag::notable-breaking-changes[]

// end::notable-breaking-changes[]

[float]
==== StartRollupJob endpoint returns success if job already started

Previously, attempting to start an already-started rollup job would
result in a `500 InternalServerError: Cannot start task for Rollup Job
[job] because state was [STARTED]` exception.

Now, attempting to start a job that is already started will just
return a successful `200 OK: started` response.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -70,20 +69,16 @@ public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
waitForCompletion = in.readBoolean();
timeout = in.readTimeValue();
}
waitForCompletion = in.readBoolean();
timeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeBoolean(waitForCompletion);
out.writeTimeValue(timeout);
}
out.writeBoolean(waitForCompletion);
out.writeTimeValue(timeout);
}

public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,10 @@ public RollupJobConfig getConfig() {
}

/**
* Attempt to start the indexer. If the state is anything other than STOPPED, this will fail.
* Otherwise, the persistent task's status will be updated to reflect the change.
* Attempt to start the indexer.
* - If the indexer is started/indexing, returns OK
* - If the indexer is stopped, starts task, updates persistent task's status, returns ok
* - Anything else returns error
*
* Note that while the job is started, the indexer will not necessarily run immediately. That
* will only occur when the scheduler triggers it based on the cron
Expand All @@ -220,8 +222,14 @@ public RollupJobConfig getConfig() {
*/
public synchronized void start(ActionListener<StartRollupJobAction.Response> listener) {
final IndexerState prevState = indexer.getState();
if (prevState != IndexerState.STOPPED) {
// fails if the task is not STOPPED

if (prevState == IndexerState.STARTED || prevState == IndexerState.INDEXING) {
// We're already running so just return acknowledgement
logger.debug("Indexer already running (State: [" + prevState + "]), acknowledging start without change.");
listener.onResponse(new StartRollupJobAction.Response(true));
return;
} else if (prevState != IndexerState.STOPPED) {
// if we're not already started/indexing, we must be STOPPED to get started
listener.onFailure(new ElasticsearchException("Cannot start task for Rollup Job [" + job.getConfig().getId() + "] because"
+ " state was [" + prevState + "]"));
return;
Expand All @@ -230,11 +238,10 @@ public synchronized void start(ActionListener<StartRollupJobAction.Response> lis
final IndexerState newState = indexer.start();
if (newState != IndexerState.STARTED) {
listener.onFailure(new ElasticsearchException("Cannot start task for Rollup Job [" + job.getConfig().getId() + "] because"
+ " state was [" + newState + "]"));
+ " new state was [" + newState + "]"));
return;
}


final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" +
state.getPosition() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,16 @@ public void testStartWhenStarted() throws InterruptedException {
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
task.start(new ActionListener<>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
fail("Should not have entered onResponse.");
assertTrue(response.isStarted());
latch.countDown();
}

@Override
public void onFailure(Exception e) {
assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job ["
+ job.getConfig().getId() + "] because state was [STARTED]"));
latch.countDown();
fail("Should not have throw exception: " + e.getMessage());
}
});
latch.await(3, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ setup:
- is_true: started

- do:
catch: /Cannot start task for Rollup Job \[foo\] because state was/
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
rollup.start_job:
id: foo
- is_true: started