-
Notifications
You must be signed in to change notification settings - Fork 4k
kvserver: add onProcessResult and onEnqueueResult to processCallback #152792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1ef144c to
9096ee8
Compare
9096ee8 to
6671f9a
Compare
maybeBackpressureBatch registers a callback with the split queue for replicas that are too large relative to their split size. This backpressures the range to stop it from growing and prevent new writes from blocking a pending split. The callback is invoked when the split queue finishes processing the replica. Previously, the error channel used in the callback had a size of 1 and performed blocking sends. This was safe because the base queue only sent a single error, and by the time maybeBackpressureBatch returned, the callback was guaranteed to have been consumed, and no additional sends would occur. Future commits will allow the callback to be invoked multiple times (although it should only twice at most). To be safe and avoid potential deadlocks from multiple sends after maybeBackpressureBatch already returns, this commit makes the error send non-blocking. If the channel is already full, the error is dropped, which is acceptable since we only care about observing the completion of the replica processing at least once.
baseQueue.Async may return immediately as a noop if the semaphore does not available capacity and the wait parameter is false. Previously, this case returned no error, leaving the caller unaware that the request was dropped. This commit changes the behavior to return a baseQueueAsyncRateLimited error, allowing callers to detect and handle the condition.
The base queue already supports registering callbacks that are invoked with the processing result of replicas once they are processed. However, replicas may fail before reaching that stage (e.g., failing to enqueue or dropped early). This commit extends the mechanism to also report enqueue results, allowing callers to detect failures earlier. Currently, only decommissioningNudger.maybeEnqueueProblemRange uses this. Note that one behavior change is introduced: previously, a registered callback would fire only once with the processing result and not again if the replica was later processed by the purgatory queue. With this change, the callback may now be invoked twice.
This commit adds TestBaseQueueCallbackOnEnqueueResult and TestBaseQueueCallbackOnProcessResult to verify that callbacks are correctly invoked with both enqueue and process results.
6671f9a to
2561173
Compare
tbg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, looking good! Some comments about de-risking the logging while keeping it useful (and misc other comments, none of them of particular substance).
@tbg reviewed 1 of 1 files at r1, 3 of 3 files at r2, 5 of 5 files at r3, 2 of 2 files at r4, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)
pkg/kv/kvserver/replica_backpressure.go line 214 at r1 (raw file):
case splitC <- err: default: // TODO(wenyihu6): should we add ctx timeout when invoking callbacks
this TODO seems out of place, and along with the unrelated comment below it tripped me up a little. Consider removing or moving the TODO.
pkg/kv/kvserver/queue.go line 132 at r3 (raw file):
// before processing due to exceeding max queue size. // 2. onProcessResult may be called with error first and sent to the purgatory // queue and called again when the puragtory processes the replica.
puragtory
pkg/kv/kvserver/queue.go line 150 at r3 (raw file):
onProcessResult func(err error) // onEnqueueResult is called with the result of the enqueue attempt. It is
nit: for temporal consistency, move onEnqueue up above onProcessResult.
pkg/kv/kvserver/queue.go line 163 at r3 (raw file):
// callback is -1. Note: indexOnHeap does not represent the item's exact rank // by priority. It only reflects the item's position in the heap array, which // gives a rough idea of where it sits in the priority hierarchy.
Here too could you reflect on whether the callback is necessarily invoked at all? I imagine there are cases where it isn't, and it'd be good to call this out.
pkg/kv/kvserver/queue.go line 645 at r3 (raw file):
type queueHelper interface { MaybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) Add(ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback)
the new variable name shadows the type, which isn't ideal. Maybe just processCB processCallback.
pkg/kv/kvserver/queue.go line 703 at r3 (raw file):
// enqueued or processed. func (bq *baseQueue) AddAsyncWithCallback( ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback,
ditto (maybe just search for processCallback processCallback to get all the offenders.
pkg/kv/kvserver/queue.go line 800 at r3 (raw file):
// We checked this above in MaybeAdd(), but we need to check it // again for Add(). processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaNotInitialized)
It would be more robust to do named returns and then add a defer:
func (bq *baseQueue) addInternal(/* ... */) (_ bool, err error) {
defer func() {
if err != nil {
processCB.onEnqueueResult(-1 /* indexOnHeap */, err)
}
}()
}pkg/kv/kvserver/queue.go line 849 at r3 (raw file):
log.Dev.Infof(ctx, "updating priority: %0.3f -> %0.3f", item.priority, priority) } // TODO(wenyihu6): will this introduce a lot of new memory allocation?
Would it be better to treat this as a success case? After all, the caller will most likely care about whether the replica is now in the queue, and it is.
pkg/kv/kvserver/queue.go line 874 at r3 (raw file):
priority, replicaItemToDrop.replicaID) // TODO(wenyihu6): when we introduce base queue max size cluster setting, // remember to invoke this callback when shrinking the size
Adding an extra anchor comment here as a reminder to do this, since I reviewed the shrinking size code already. Likely best to stack this PR on top.
There is also the case in SetQueueSize that would need to call callbacks, so most likely a good idea to factor out a single method that is called from both places to do the callback handling.
pkg/kv/kvserver/replica.go line 2968 at r3 (raw file):
// this function to another file so that we can avoid the spam on // other logs. log.KvDistribution.Infof(ctx,
Some rate limiting here would be good here:
var logBudgetEnqueueResult atomic.Int
logBudgetEnqueueResult.Store(15)
// in the callback:
if logBudgetEnqueueResult.Add(-1) > 0 {
// log.Infof
} else {
// vmodule
}
So basically each decommission nudger run will produce at most 15 messages for enqueue failures (I didn't name the var very well: it was meant to be for the failure case, we should have one atomic per message type).
Alternatively, and simpler, we could gate all logging behind verbosity if the number of replicas we're nudging is (say) >15. But I like getting some information even in the case in which there are more than that number of replicas, which this atomic-based thing would give us.
wenyihu6
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @tbg)
pkg/kv/kvserver/queue.go line 800 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
It would be more robust to do named returns and then add a defer:
func (bq *baseQueue) addInternal(/* ... */) (_ bool, err error) { defer func() { if err != nil { processCB.onEnqueueResult(-1 /* indexOnHeap */, err) } }() }
I’m calling onEnqueueResult with an error even when the returned error is nil in certain cases, such as errReplicaAlreadyInPurgatory or errReplicaAlreadyProcessing. I want to discuss the desired behavior here before making the changes, especially in light of your point below about whether a replica already in the queue should be treated as a success.
I still want visibility into these cases - at least knowing if a replica is somehow stuck in purgatory or processing. While the replicas are technically in the queue and things might seem fine, I’m concerned about subtle bugs like #114365 and whether our current visibility would allow us to reason them.
|
Previously, tbg (Tobias Grieger) wrote…
Agree that it seems better to treat this as a success case and invoke |
tbg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)
pkg/kv/kvserver/queue.go line 800 at r3 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
I’m calling
onEnqueueResultwith an error even when the returned error isnilin certain cases, such aserrReplicaAlreadyInPurgatoryorerrReplicaAlreadyProcessing. I want to discuss the desired behavior here before making the changes, especially in light of your point below about whether a replica already in the queue should be treated as a success.I still want visibility into these cases - at least knowing if a replica is somehow stuck in purgatory or processing. While the replicas are technically in the queue and things might seem fine, I’m concerned about subtle bugs like #114365 and whether our current visibility would allow us to reason them.
I see - hadn't noticed the return nil with an associated error callback. This makes sense to me. I still think you could take the suggestion, though - in the purgatory and the requeue case, you invoke the callback manually (with a comment). For the other "vanilla" cases that return an error and also want it to be passed to the callback, you use the defer.
This commit updates the comments to better clarify the semantics and the contract of processCallback.
Previously, bq.addInternal invoked processCallback.onEnqueueResult with a newly constructed error whenever a replica was already present and re-enqueued with a higher priority, since the priority had to be passed dynamically. This commit instead treats the case as a success and passes the updated heap index to onEnqueueResult, as the caller mainly cares about whether the replica is already in the queue and its new position.
Previously, the variable name processCallback shadowed its type name, which was not ideal. This commit renames the variable to cb.
Previously, cb.onEnqueueResult was invoked inline before returning errors, which was less robust and required explicit calls. This commit refactors the code to invoke onEnqueueResult in a defer statement when returning a non-nil error. Note that the function may still call cb.onEnqueueResult with non-nil errors even when no error is returned, since we want visibility into those cases as well.
wenyihu6
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @tbg)
pkg/kv/kvserver/queue.go line 132 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
puragtory
Fixed.
pkg/kv/kvserver/queue.go line 150 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
nit: for temporal consistency, move
onEnqueueup aboveonProcessResult.
Done.
pkg/kv/kvserver/queue.go line 163 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Here too could you reflect on whether the callback is necessarily invoked at all? I imagine there are cases where it isn't, and it'd be good to call this out.
Done. It’s a bit more nuanced because addInternal and MaybeAddCallback behave a bit differently when registering callbacks. Added comments on them to clarify the behavior.
pkg/kv/kvserver/queue.go line 645 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
the new variable name shadows the type, which isn't ideal. Maybe just
processCB processCallback.
Good call - renamed to cb processCallback to match the existing convention used with (replicaItem) func registerCallback.
pkg/kv/kvserver/queue.go line 703 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
ditto (maybe just search for
processCallback processCallbackto get all the offenders.
Done.
pkg/kv/kvserver/queue.go line 800 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
I see - hadn't noticed the
return nilwith an associated error callback. This makes sense to me. I still think you could take the suggestion, though - in the purgatory and the requeue case, you invoke the callback manually (with a comment). For the other "vanilla" cases that return an error and also want it to be passed to the callback, you use the defer.
Makes sense, fixed in a follow up commit.
pkg/kv/kvserver/queue.go line 874 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Adding an extra anchor comment here as a reminder to do this, since I reviewed the shrinking size code already. Likely best to stack this PR on top.
There is also the case in
SetQueueSizethat would need to call callbacks, so most likely a good idea to factor out a single method that is called from both places to do the callback handling.
I haven’t added the helper function yet, as I’m still considering how to structure it. Invoking the callback inside bq.removeFromReplicaSetLocked seems like the natural approach. This would also resolve the semantic issues as well, but it introduces a behavior change by calling the callback while holding a lock. The comment also seems to suggest that there may be things I'm overlooking:
cockroach/pkg/kv/kvserver/queue.go
Lines 825 to 827 in 84697f3
| // is okay with these semantics. Adding new uses is discouraged without cleaning | |
| // up the contract of this method, but this code doesn't lend itself readily to | |
| // upholding invariants so there may need to be some cleanup first. |
pkg/kv/kvserver/replica.go line 2968 at r3 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Some rate limiting here would be good here:
var logBudgetEnqueueResult atomic.Int
logBudgetEnqueueResult.Store(15)// in the callback:
if logBudgetEnqueueResult.Add(-1) > 0 {
// log.Infof
} else {
// vmodule
}So basically each decommission nudger run will produce at most 15 messages for enqueue failures (I didn't name the var very well: it was meant to be for the failure case, we should have one atomic per message type).
Alternatively, and simpler, we could gate all logging behind verbosity if the number of replicas we're nudging is (say) >15. But I like getting some information even in the case in which there are more than that number of replicas, which this atomic-based thing would give us.
Good idea. I’m opening a new PR #152885 for this, as there are some design decisions I made that are still open for discussion.
pkg/kv/kvserver/replica_backpressure.go line 214 at r1 (raw file):
Previously, tbg (Tobias Grieger) wrote…
this TODO seems out of place, and along with the unrelated comment below it tripped me up a little. Consider removing or moving the TODO.
Removed.
tbg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tbg reviewed 2 of 2 files at r5, 1 of 1 files at r6, 2 of 2 files at r7, 1 of 1 files at r8, 1 of 1 files at r9, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)
pkg/kv/kvserver/queue.go line 874 at r3 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
I haven’t added the helper function yet, as I’m still considering how to structure it. Invoking the callback inside
bq.removeFromReplicaSetLockedseems like the natural approach. This would also resolve the semantic issues as well, but it introduces a behavior change by calling the callback while holding a lock. The comment also seems to suggest that there may be things I'm overlooking:
cockroach/pkg/kv/kvserver/queue.go
Lines 825 to 827 in 84697f3
// is okay with these semantics. Adding new uses is discouraged without cleaning // up the contract of this method, but this code doesn't lend itself readily to // upholding invariants so there may need to be some cleanup first.
Yep, we can revisit separately. I think that comment is mostly FUD, btw. At this point you have an at least equal understanding to Dan (who wrote the comment at the time). You can stack your attempt as a new PR, then we can focus on not messing anything up.
|
TFTR! bors r=tbg |
152787: kvserver: improve observability with decommission nudger r=tbg a=wenyihu6 Stacked on top of #152792 Resolves: #151847 Epic: none --- **kvserver: improve observability with decommission nudger** Previously, we added the decommissioning nudger which nudges the leaseholder replica of decommissioning ranges to enqueue themselves into the replicate queue for decommissioning. However, we are still observing extended decommission stall with the nudger enabled. Observability was limited, and we could not easily tell whether replicas were successfully enqueued or processed. This commit improves observability by adding four metrics to track the enqueue and processing results of the decommissioning nudger: ranges.decommissioning.nudger.{enqueue,process}.{success,failure}. --- **kvserver: add enqueue metrics to base queue** Previously, observability into base queue enqueuing was limited to pending queue length and process results. This commit adds enqueue-specific metrics for the replicate queue: - queue.replicate.enqueue.add: counts replicas successfully added to the queue - queue.replicate.enqueue.failedprecondition: counts replicas that failed the replicaCanBeProcessed precondition check - queue.replicate.enqueue.noaction: counts replicas skipped because ShouldQueue determined no action was needed - queue.replicate.enqueue.unexpectederror: counts replicas that were expected to be enqueued (ShouldQueue returned true or the caller attempted a direct enqueue) but failed due to unexpected errors --- **kvserver: move bq.enqueueAdd update to be outside of defer** Previously, we updated bq.enqueueAdd inside the defer statement of addInternal. This was incorrect because we may return queued = true for a replica already processing and was marked for requeue. That replica would later be requeued in finishProcessingReplica, incrementing the metric again, lead to double counting. --- **kvserver: test metrics in TestBaseQueueCallback* and TestReplicateQueueDecommissionScannerDisabled** his commit extends TestBaseQueueCallback* and TestReplicateQueueDecommissionScannerDisabled to also verify metric updates. 152984: sql/inspect: convert internal errors to inspect issues r=spilchen a=spilchen Previously, internal errors during index consistency checks would fail the entire job. Now these errors are converted to structured inspect issues with detailed context. Closes #148299 Release Notes: None Epic: None Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Matt Spilchen <[email protected]>
Part of: #151847
Epic: none
kvserver: use non-blocking send on errors for maybeBackpressureBatch
maybeBackpressureBatch registers a callback with the split queue for replicas
that are too large relative to their split size. This backpressures the range to
stop it from growing and prevent new writes from blocking a pending split. The
callback is invoked when the split queue finishes processing the replica.
Previously, the error channel used in the callback had a size of 1 and performed
blocking sends. This was safe because the base queue only sent a single error,
and by the time maybeBackpressureBatch returned, the callback was guaranteed to
have been consumed, and no additional sends would occur.
Future commits will allow the callback to be invoked multiple times (although it
should only twice at most). To be safe and avoid potential deadlocks from
multiple sends after maybeBackpressureBatch already returns, this commit makes
the error send non-blocking. If the channel is already full, the error is
dropped, which is acceptable since we only care about observing the completion
of the replica processing at least once.
kvserver: return baseQueueAsyncRateLimited from bq.Async
baseQueue.Async may return immediately as a noop if the semaphore does not
available capacity and the wait parameter is false. Previously, this case
returned no error, leaving the caller unaware that the request was dropped. This
commit changes the behavior to return a baseQueueAsyncRateLimited error,
allowing callers to detect and handle the condition.
kvserver: add onProcessResult and onEnqueueResult to processCallback
The base queue already supports registering callbacks that are invoked with the
processing result of replicas once they are processed. However, replicas may
fail before reaching that stage (e.g., failing to enqueue or dropped early).
This commit extends the mechanism to also report enqueue results, allowing
callers to detect failures earlier. Currently, only
decommissioningNudger.maybeEnqueueProblemRange uses this.
Note that one behavior change is introduced: previously, a registered callback
would fire only once with the processing result and not again if the replica was
later processed by the purgatory queue. With this change, the callback may now
be invoked twice.
kvserver: add TestBaseQueueCallback
This commit adds TestBaseQueueCallbackOnEnqueueResult and
TestBaseQueueCallbackOnProcessResult to verify that callbacks are correctly
invoked with both enqueue and process results.