-
Notifications
You must be signed in to change notification settings - Fork 5.5k
[WIP] ajay opec #21135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] ajay opec #21135
Conversation
Add metric for retry and unit test for graceful shutdown
…fer from the coordinator
| public void handleShutdown(TaskId taskId) | ||
| { | ||
| String errorMessage = String.format("killing pending task %s due to host being shutting down", taskId); | ||
| taskStateMachine.graceful_failed(new HostShuttingDownException(errorMessage, System.nanoTime())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.nanoTime is to measure elapsed time and not absolute time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : graceful_failed method name
| taskStateMachine.cancel(); | ||
| } | ||
|
|
||
| public void graceful_failed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no one calls this
| void start(); | ||
|
|
||
| void addSplits(Multimap<PlanNodeId, Split> splits); | ||
| boolean addSplits(Multimap<PlanNodeId, Split> splits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : add javadoc to say what boolean return value means
| return pendingSourceSplitCount; | ||
| } | ||
|
|
||
| public synchronized void setIsRetried() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why synchronized. ?
|
|
||
| if (failedTasks.isEmpty()) { | ||
| checkState(finishedTasks.isEmpty()); | ||
| List<RemoteTask> idleRunningHttpRemoteTasks = getAllTasks().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN idleRunningHttpRemoteTasks sounds like an oxymoron
| .filter(task -> task.getTaskStatus().getState() == TaskState.RUNNING) | ||
| .filter(task -> task.isTaskIdling()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine both filters into one
| private final AccessControlContext context; | ||
| private final Optional<Tracer> tracer; | ||
| private final WarningCollector warningCollector; | ||
| private final ConcurrentHashMap<PlanFragmentId, Pair<NodePoolType, String>> fragmentToPoolTypeMapping = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. code to interfaces. ConcurrentMap
| ConcurrentHashMap<PlanFragmentId, Session.Pair<NodePoolType, String>> fragmentToPoolTypeMapping = session.getFragmentToPoolTypeMapping(); | ||
| Session.Pair<NodePoolType, String> nodePoolTypeStringPair = fragmentToPoolTypeMapping.get(plan.getFragment().getId()); | ||
| if (nodePoolTypeStringPair != null && nodePoolTypeStringPair.getKey() != workerPoolType) { | ||
| log.error("Error in pool type evaluation, plan =%s,hasLocalExchangeAtRoot = %s, isLeaf=%s, plan in map =%s", plan.getFragment().getJsonRepresentation().orElse(null), hasLocalExchangeAtRoot, plan.getFragment().isLeaf(), nodePoolTypeStringPair.getValue()); | ||
| throw new RuntimeException("Error in pool type evaluation"); | ||
| } | ||
| else { | ||
| fragmentToPoolTypeMapping.put(plan.getFragment().getId(), new Session.Pair<>(workerPoolType, plan.getFragment().getJsonRepresentation().orElse(""))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN when do we enter into the erorr situation.
Also why do we need the entire plan fragment json stuffed into the session
| if (taskSplit.getScheduledSplit() != null) { | ||
| log.warn("Adding split %s to pending split tracker for task %s", taskSplit.getScheduledSplit().getSequenceId(), taskHandle.getTaskId()); | ||
| gracefulShutdownSplitTracker.getPendingSplits().computeIfAbsent(taskHandle.getTaskId(), k -> ConcurrentHashMap.newKeySet()).add(taskSplit.getScheduledSplit().getSequenceId()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is gracefulShutdownSplitTracker needed for non shutdown cases?
| QUERY_PLANNING_TIMEOUT(0x0001_0027, INTERNAL_ERROR), | ||
| NATIVE_EXECUTION_TASK_ERROR(0x0001_0028, INTERNAL_ERROR), | ||
| UNSUPPORTED_ANALYZER_TYPE(0x0001_0029, INTERNAL_ERROR), | ||
| REMOTE_HOST_GONE_INTERMEDIATE(0x0001_0029, INTERNAL_ERROR, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used anywhere
| return tasks.getUnchecked(taskId).abort(); | ||
| } | ||
|
|
||
| public void failTask(TaskId taskId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only called for testing
| { | ||
| for (ClientBuffer partition : partitions) { | ||
| if (!partition.isEmptyPages()) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit this logic is duplicated in multiple output buffer implementations.
| .withFilter(GenerateTraceTokenRequestFilter.class) | ||
| .withConfigDefaults(config -> { | ||
| config.setRequestTimeout(new Duration(10, SECONDS)); | ||
| config.setRequestTimeout(new Duration(30, SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
random change?
| AtomicReference<SqlQuerySchedulerInterface> queryScheduler = this.queryScheduler; | ||
| stateMachine.addStateChangeListener(state -> { | ||
| if (!state.isDone()) { | ||
| if (!state.isDone() || state == QueryState.FINISHED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN not Done or finished seems interesting.
| public void gracefulShutdown() | ||
| { | ||
| isShuttingDown.set(true); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit calling it graceful shutdown implies there is some action to be done.
maybe just call it as markTaskAsShutdown
| private final TimeStat outputBufferEmptyWaitTime = new TimeStat(NANOSECONDS); | ||
| private final TimeStat waitForRunningSplitTime = new TimeStat(NANOSECONDS); | ||
| private volatile boolean closed; | ||
| private final ExecutorService taskShutdownExecutor = newCachedThreadPool(daemonThreadsNamed("task-shutdown-%s")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a cached thread pool which starts from no threads , it might be better to go with a thread pool of min size = p90 number of tasks on a dying node since thread creation takes time.
| builderWithOutputBufferInfo("init", shuttingdownNode, outputBuffer) | ||
| .build()); | ||
|
|
||
| while (taskHandle.getRunningLeafSplits() > 0 || taskHandle.getRunningIntermediateSplits() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN : why are we checking for taskHandle.getRunningIntermediateSplits()
| public static final String OB_WAIT_OVER = "obWait-over"; | ||
| public static final String OB_WAIT = "obWait"; | ||
| public static final String SPLIT_WAIT = "splitWait"; | ||
| public static final String SPLIT_WAIT_OVER = "splitWait-over"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are planning to keep it in prod, might be good to add ordinals prefixed so that it is easier to understand the sequence of events
| catch (Exception ex) { | ||
| log.error(ex, "GracefulShutdown got interrupted while waiting for split completion for task %s", taskId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the intend is to catch Interrupted Exception, we need to specifically catch that and set the interrupt flag back.
| { | ||
| isGracefulShutdownStarted.set(true); | ||
| long shutdownStartTime = System.nanoTime(); | ||
| //TODO throw error for new task creation instead of looping here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN : Are we creating tasks here?
| Duration splitConcurrencyAdjustFrequency, | ||
| OptionalInt maxDriversPerTask) | ||
| { | ||
| return addTask(taskId, utilizationSupplier, initialSplitConcurrency, splitConcurrencyAdjustFrequency, maxDriversPerTask, Optional.empty(), Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: get rid of this wrapper since the above method is only called in 1 place
| if (!hostShutDownListener.isPresent()) { | ||
| return; | ||
| } | ||
| hostShutDownListener.get().handleShutdown(taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : rewrite in functional style as
hostShutDownListener.ifPresent(taskShutDownListener -> taskShutDownListener.handleShutdown(taskId));
| if (!hostShutDownListener.isPresent()) { | ||
| return; | ||
| } | ||
| hostShutDownListener.get().addStats(taskShutdownStats); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
hostShutDownListener.ifPresent(taskShutDownListener -> taskShutDownListener.addStats(taskShutdownStats));
| builder.addAll(runningLeafSplits); | ||
| builder.addAll(queuedLeafSplits); | ||
| //To avoid queued split marked as completed splits to pollute the retryable splits | ||
| if (isShuttingDown.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AGPQN : This check is a bit confusing. Can we call this method destroy say if we want to do speculative execution and cancel/destroy the work which is independent of the notion of a shutdown.
| queuedLeafSplits.add(split); | ||
| } | ||
| else { | ||
| checkState(!split.isSplitAlreadyStarted(), "Split we are avoiding to queue was already started!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit avoiding -> adding
Description
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
If release note is NOT required, use: