Conversation
| // notify that task state changed (apart from initial RUNNING state notification) | ||
| if (newState != RUNNING) { | ||
| // notify that task state changed (apart from initial INITIALIZING state notification) | ||
| if (newState != INITIALIZING) { |
There was a problem hiding this comment.
do we want to notify about RUNNING now?
There was a problem hiding this comment.
good question, if notifying useful for terminating states only then probably not?
There was a problem hiding this comment.
if we don't care about notification from changing state from INITIALIZING -> RUNNING, then why an extra state is needed?
There was a problem hiding this comment.
Extra state is needed to block execution of the query before all catalogs are loaded on the worker.
There was a problem hiding this comment.
The reasoning for not notifying on certain changes is not something I fully understand
| { | ||
| requireNonNull(catalogs, "catalogs is null"); | ||
| return this.catalogs.compareAndSet(null, requireNonNull(catalogs, "catalogs is null")); | ||
| return this.catalogs.compareAndSet(null, catalogs); |
There was a problem hiding this comment.
can it be called twice? Should you verify that compareAndSet returns true
There was a problem hiding this comment.
This is allowed to be called more than once. In case of coordinator -> worker connection drop, or just timeout, or polling while waiting for task state == RUNNING
There was a problem hiding this comment.
I think split assignments also come through here?
| ReentrantReadWriteLock.ReadLock catalogInitLock = catalogsLock.readLock(); | ||
| catalogInitLock.lock(); | ||
| try { | ||
| connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs); |
There was a problem hiding this comment.
looks like ensureCatalogsLoaded could return ListenableFuture
There was a problem hiding this comment.
Wasn't sure how to ensure that the lock is acquired for the execution of the Future only when its needed
There was a problem hiding this comment.
Or it cannot because it has to be complete its work with the lock boundary? what are those locks gaurding. Should't those be internal detail of ensureCatalogsLoaded
There was a problem hiding this comment.
There is a race condition with pruneCatalog, where a catalog can be pruned immediately after creation, but before use. This lock guards against that
| try { | ||
| catalogLoading.get(5, SECONDS); | ||
| sqlTask.setCatalogsLoaded(immediateVoidFuture()); | ||
| } |
There was a problem hiding this comment.
why special casing here with active wait. Why not always use the listener of future.
There was a problem hiding this comment.
To avoid adding a round trip to most task creates mostly.
There was a problem hiding this comment.
Most cases catalogs will be "loaded" in well under 5 seconds, because they are already loaded.
98549d3 to
ed5fc1d
Compare
There was a problem hiding this comment.
Update comment what this state means
There was a problem hiding this comment.
nit: use proper variable name in error message
There was a problem hiding this comment.
I think this will be triggered on failure on future too. You do not want that.
Replace those two lines with:
addCallback(catalogsLoadedFuture, new FutureCallback<>()
{
@Override
public void onSuccess(Void result)
{
taskStateMachine.transitionToRunning();
}
@Override
public void onFailure(Throwable t)
{
taskStateMachine.failed(t);
}
}, directExecutor());| try { | ||
| catalogLoading.get(5, SECONDS); | ||
| sqlTask.setCatalogsLoaded(immediateVoidFuture()); | ||
| } |
| ReentrantReadWriteLock.ReadLock catalogInitLock = catalogsLock.readLock(); | ||
| catalogInitLock.lock(); | ||
| try { | ||
| connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs); |
There was a problem hiding this comment.
+ 1 is not obvious here. Explain in code comment what is going on here.
|
@pettyjamesm you can take a look too |
| // notify that task state changed (apart from initial RUNNING state notification) | ||
| if (newState != RUNNING) { | ||
| // notify that task state changed (apart from initial INITIALIZING state notification) | ||
| if (newState != INITIALIZING) { |
There was a problem hiding this comment.
if we don't care about notification from changing state from INITIALIZING -> RUNNING, then why an extra state is needed?
There was a problem hiding this comment.
throw an error if compareAndSet failed. Can it be called multiple times?
There was a problem hiding this comment.
yes. Update task can be called multiple times. Believe split assignments also come through this endpoint
There was a problem hiding this comment.
When using this factory without SqlTaskManager
Is it only for testing? add a comment
There was a problem hiding this comment.
you shouldn't update when pendingSourceSplitCount == 0, should you?
BTW: why if (taskStatusFetcher.getTaskStatus().getState() == INITIALIZING) is needed? It seems it would work without it. Also, ignoring splitAssignments looks wrong. Assertion (that it's empty) would be useful at very least
There was a problem hiding this comment.
the task isn't running, and coordinator shouldn't treat it as running. So an update loop until it is running was my plan
I felt continuing to assign splits to a task that is still in init stage didn't make sense, because they won't get processes.
There was a problem hiding this comment.
Add a comment why you return here. Is it needed?
Additionally, you need to schedule update when task transitions from INITIALIZE to RUNNING.
You should add a test that query won't deadlock, e.g:
- Task is scheduled in
INITIALIZEstate - All splits (single split) is scheduled in
HttpRemoteTask - Task transitions to
RUNNINGstate - Expected:
HttpRemoteTaskwill schedule task update on transition fromINITIALIZE -> RUNNING
There was a problem hiding this comment.
I believe incrementing pendingRequestsCounter.incrementAndGet(); causes a reschedule of the update and will spin the update until the state is transitioned away. (The call site for this function has that logic).
There was a problem hiding this comment.
pendingRequestsCounter.incrementAndGet();
It won't cause reschedule on it's own. You need to call triggerUpdate.
and will spin the update until the state is transitioned away
We shouldn't spin TaskUpdateRequest actively until stage goes away, because TaskUpdateRequest (and TaskInfo responses) are expensive. The proper way to do it is to listen for state changes from taskStatusFetcher and act upon these (e.g. call triggerUpdate)
BTW: is it OK to send splits while task is initializing? If so, maybe you don't need special handling in HttpRemoteTask
There was a problem hiding this comment.
Is the taskStatusFetcher always polling? And can used to block query execution until state ~= Initializing?
There was a problem hiding this comment.
I think I found a way to block stage execution until initialized is complete. I can probably remove this. But our latency would be limited by the latency of ContinuousTaskStatusFetcher?
There was a problem hiding this comment.
Is it a problem if a task fails to initialize after accepting splits?
There was a problem hiding this comment.
Is the taskStatusFetcher always polling? And can used to block query execution until state ~= Initializing?
Yes. It's using long pulling.
Is it a problem if a task fails to initialize after accepting splits?
Maybe not, but I don't know exactly how dynamic catalogs work
There was a problem hiding this comment.
why lock is needed? Can catalogs be updated concurrently? If so, how do you guarantee that they are updated in correct order?
There was a problem hiding this comment.
The lock is needed for a race condition with catalog pruning. The lock ensures proper ordering
There was a problem hiding this comment.
The lock ensures proper ordering
What does prevent execution of:
init of catalogs
prune catalogs
or
prune catalogs
init of catalogs
lock on it's own doesn't enforce order. Do I miss something?
There was a problem hiding this comment.
This is the PR for it: was able to replicate in unit test. The general order was that while prune was collecting the catalogs for a task, it was possible for a task to be assigned catalogs, and load them, only for the pruning to run right after before the query executed.
There was a problem hiding this comment.
#19683 was able to replicate in unit test.
How does that PR relates to change here?
There was a problem hiding this comment.
That PR introduced the lock. This PR just move the location of the lock
There was a problem hiding this comment.
this will introduce additional query latency (task won't be updated with splits for 5s as only one request at a time is allowed). We need to figure another way to do this.
There was a problem hiding this comment.
Let's chat with @dain. This comes directly from conversations with him. My understanding is that this will only wait 5 seconds if the catalogs are still loading ( and the query cannot execute anyway)
There was a problem hiding this comment.
My understanding is that this will only wait 5 seconds if the catalogs are still loading ( and the query cannot execute anyway)
Do I understand correctly that catalogs can be different per query? What if catalogs load in 10 milliseconds? In that case split assignments will be blocked for remainder of 5s.
Lastly, is 5s always sufficient?
There was a problem hiding this comment.
Is that how the future timeout works? It always blocks 5s?
5s is not always sufficient for loading all catalogs, which is why I need to block execution until the task has initialized all catalogs.
There was a problem hiding this comment.
Just tested. I believe this early terminates
There was a problem hiding this comment.
5s is not always sufficient for loading all catalogs, which is why I need to block execution until the task has initialized all catalogs.
I think this means it should not be time based, but event based (e.g. some future)
|
@dain can you please take a look a this one? |
|
I don't have opinion about the code changes yet, but would be nice to fill PR Description to explain why it's a good change to have. |
|
Will try and get some testing started to prove this out as a POC and see what's missing. |
There was a problem hiding this comment.
These execute but I can't tell if this is sufficient to test
e17ba4c to
d1492fa
Compare
d1492fa to
c18746d
Compare
c18746d to
1c9b52d
Compare
|
scrapping |
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: