Skip to content

Conversation

@hendrikmuhs
Copy link

@hendrikmuhs hendrikmuhs commented Apr 4, 2019

refactor onStart and onFinish to take action listeners and execute them when indexer is in indexing state.

Motivation of this change: DataFrameTransforms requires a hooks to prepare inner state before the 1st search request and to change inner state before the indexer finishes.

Rollup used the hooks mostly for logging purposes ans should be unaffected by this change.

@hendrikmuhs hendrikmuhs added >non-issue :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v8.0.0 v7.2.0 :ml/Transform Transform labels Apr 4, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo

@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core

@hendrikmuhs hendrikmuhs changed the title refactor to call beforeFinish before the indexer changes state refactor onStart and onFinish to take runnables and executed them guarded by state Apr 5, 2019
Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hendrikmuhs , I left some comments

* @param next Runnable for the next phase
*/
protected abstract void onStartJob(long now);
protected abstract void onStart(long now, Runnable next);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're expecting async calls to be fired in onStart we should provide an ActionListener<Void> rather than a Runnable ? This way the implementation can fail the job nicely if an error occurs ?

stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should catch the exception here, if an error occurs in onStart we should provide an ActionListener to let the caller deals with the error (e.g. call listener.onFailure(e)):

onStart(now,  ActionListener.wrap((o) -> {
                            stats.markStartSearch();
                            doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
                        }, (e) -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we should call onFailure before we save the state to be consistent with onFinish : onFailure(Exception e, Runnable finishAndSaveState) ?

// being persisted as STARTED but then stop the job
doSaveState(finishAndSetState(), position.get(), () -> {});
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rather document the fact that the Runnable provided with onFinish (and maybe onFailure) should always run (even on errors). If we don't expect async calls in onFinish we can also keep the current code and just switch the execution order of onFinish and doSaveState.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left one comment regarding the call to onFailure if onFinish throws an error but other than that the refactor looks good to me.

// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we already called onFinish I don't think we should call onFailure if saving the state throws an error. It's not ideal but I'd prefer that we ignore the error and just finish the job.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for iterating on this !

@hendrikmuhs hendrikmuhs merged commit e10f779 into elastic:master Apr 7, 2019
hendrikmuhs pushed a commit that referenced this pull request Apr 7, 2019
…rded by state (#40855)

refactor onStart and onFinish to take action listeners and execute them when indexer is in indexing state.
gurkankaymak pushed a commit to gurkankaymak/elasticsearch that referenced this pull request May 27, 2019
…rded by state (elastic#40855)

refactor onStart and onFinish to take action listeners and execute them when indexer is in indexing state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:ml/Transform Transform >non-issue :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v7.2.0 v8.0.0-alpha1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants