Skip to content

Conversation

@benwtrent
Copy link
Member

@benwtrent benwtrent commented Apr 16, 2019

This admittedly large PR adds progress reporting to data frame transforms. The majority of the size is due to refactoring cause by yak-shaving[0] :(.

Design decisions

  • I opted to put the progress reporting into its own object (so we can add more fields as we desire in the future), and put it directly under the State object. I kept it separated from the checkpoint information just for simplicity's sake as the two pieces of information (checkpoint status and progress) are two separate pieces of information.
  • Also, due to yak-shaving, much refactoring was done in this PR. All the refactoring done in this PR would have to be done eventually when we cancel the task on _stop and I needed part of it for gathering progress information when the ES Node executes the task.
  • I am now having the task automatically start when the node executor kicks it off. This is part of the yak-shaving refactoring. It makes sense that if _start creates the task (and the executor sees that it is a new task) that it should automatically start without having to call start() on the allocated task on the node.
  • Progress information is now stored in the state, gathering the "remaining docs" via a query could require a very costly query. Specifically range queries against terms are very expensive.
  • Total number of docs is a simple enough query.

Considerations

  • This is a "good enough" progress reporting. No guarantees are made as the index could be updated so that the cursor actually hits more or fewer docs than initially gathered.

Future work

  • Have the total docs query take checkpointing into account. Right now, it only utilizes the dataframe source query. As new checkpoints are executed, the query will have to change to give an accurate count of the total docs expected to be processed in that checkpoint.

[0] https://en.wiktionary.org/wiki/yak_shaving

@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core

testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
}

integTestRunner {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know very little in how our build system works. I added fields from what is in the ml native integration tests until the tests started executing. If anybody wants to give me a run down, and show me how most of these changes are unnecessary for my native client tests, please do :).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have since paired down to only what is necessary. Next commit will have fewer changes to the build file

persistentTaskActionListener.onResponse(existingTask);
// If the task already exists but is not assigned to a node, something is weird
// return a failure that includes the current assignment explanation (if one exists)
if (existingTask.isAssigned() == false) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a check we should have been making to begin with and not doing it causes a bug. If the task exists, is not started, and is NOT assigned, that is an issue. We should not have tried to wait for its allocation and then cancel it.

Instead, I am opting for trying to start it directly if it is allocated, if not, return an error saying that the allocation failed.

// If it is not able to be assigned to a node all together, we should just close the task completely
private boolean isNotStopped(PersistentTasksCustomMetaData.PersistentTask<?> task) {
DataFrameTransformState state = (DataFrameTransformState)task.getState();
return state != null && state.getTaskState().equals(DataFrameTransformTaskState.STOPPED) == false;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I am now starting the indexer in the node executor class, a new task should move away from the state of STOPPED eventually, after being allocated. Of course, if it is not allocated, that will never happen (hence keeping the allocation checks).

As for only verifying that the task is NOT stopped, setting it to a failed state is a real possibility and since the executor has no direct means of notifying us, setting the task to failed with a reason is good enough.

indexer.set(indexerBuilder.build(this));
}

static class ClientDataFrameIndexerBuilder {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was necessary as the executor gathers the necessary dependencies in individual async calls. I needed a place to store them. Initially I kept them in local state in the executor, but that conflated ownership.

}
}

static class ClientDataFrameIndexer extends DataFrameIndexer {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to a static class as it seems to me that it should have been one from the get go. Conflating where the task's responsibilities ends and the indexer's starts has been a continuous problem with this design. This is just a first step in trying to make sure that there is at least some separation of concerns between the two.

ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
// Make a copy of the previousStats so that they are not constantly updated when `merge` is called
DataFrameIndexerTransformStats tempStats = new DataFrameIndexerTransformStats(previousStats).merge(getStats());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no longer a need to keep track of previous stats and merge them as the stats dependency is given to the indexer constructor now and increments can continue as normal.

Copy link

@hendrikmuhs hendrikmuhs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I like the separation of the task and client indexer.

Mostly nits, just 1 thing I think needs to be addressed.

logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
transformTask.currentCheckpoint.get(),
transformTask. stateReason.get());
logger.info("Updating persistent state of transform [" + transformConfig.getId() + "] to [" + state.toString() + "]");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, not changed, but would be good to reduce from info to debug.

@benwtrent benwtrent requested a review from davidkyle April 18, 2019 19:12
Copy link

@hendrikmuhs hendrikmuhs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@benwtrent
Copy link
Member Author

run elasticsearch-ci/1

Copy link
Member

@davidkyle davidkyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hendrikmuhs
Copy link

run elasticsearch-ci/1

@benwtrent benwtrent merged commit 9bf8b5a into elastic:master Apr 25, 2019
@benwtrent benwtrent deleted the feature/ml-df-calculate-docs-left branch April 25, 2019 12:54
benwtrent added a commit to benwtrent/elasticsearch that referenced this pull request Apr 25, 2019
* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
benwtrent added a commit that referenced this pull request Apr 25, 2019
* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
akhil10x5 pushed a commit to akhil10x5/elasticsearch that referenced this pull request May 2, 2019
* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
gurkankaymak pushed a commit to gurkankaymak/elasticsearch that referenced this pull request May 27, 2019
* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants