-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ListPager.EachListItem util #75849
Conversation
@smarterclayton if you get a minute would you glance at the |
/cc @jingyih |
@jpbetz: GitHub didn't allow me to request PR reviews from the following users: jingyih. Note that only kubernetes members and repo collaborators can review this PR, and authors cannot review their own PRs. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
295fa6a
to
6708a75
Compare
/retest |
1 similar comment
/retest |
/retest |
/cc @smarterclayton @jingyih @wojtek-t Looking for reviewers for this, would any of you have time to give it a look? |
Friendly nudge for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I generally understand the approach. Added a few comments. Thanks!
@@ -115,3 +127,86 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | |||
options.Continue = m.GetContinue() | |||
} | |||
} | |||
|
|||
// EachListItem invokes fn on each runtime.Object in the list. Any error immediately terminates the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be helpful to add more details here.
Function description says it invokes fn on each object in the list. In function signature, there is no list. So it has to be generated (retrieved) internally in this function.
The listing inside this function is similar but not identical to (*ListPager) List()
. In the sense that it does not fall back to full list on resource expire error. But I feel user may assume inside this function, listpager is using the same mechanism to list? You already mentioned a "Expired" error may be returned. I just feel a little more clarification on the internal listing mechanism might be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've rewritten this with more detail.
return nil | ||
}) | ||
if err == stoppedErr { | ||
err = nil // stoppedErr is an internal signal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what this line does, returning due to been signaled to stop is not actually an error.
Do we need to define an internal error? Can we use context cancel function to signal eachListChunk()
to stop? Upon closing stopC
, fn
can return nil (line 202), but the loop inside eachListChunk()
will be stopped by ctx.Done()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. A nested context works here. Since the context error should be return iff the caller cancels the context, reasoning through when/how cancel error get's handled is a bit subtle and I've added a comment explaining it. I've also added test coverage for cancelation, both for then a fn
error results in an early exit and when the caller calls cancel.
bgResultC <- err | ||
}() | ||
|
||
for o := range chunkC { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If listing chunk in the background fails (such as due to resource expired), we will still finish processing the existing chunks in the buffer before return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think from an API contract perspective, when a error occurs listing chunks, i think it's valid to either (1) stop calling fn as soon as possible, or (2) call fn on as many items as have been successfully retrieved. I went with #1 since it was trivial to implement. I imagine in different situations a client might benefit more from one of these and in other situations the other. Thoughts?
|
||
stopC := make(chan struct{}) | ||
bgResultC := make(chan error, 1) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer runtime.HandleCrash()
at the top of the goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I should do something with panics. I've gone with runtime.RecoverFromPanic
here. Hopefully it's appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runtime.RecoverFromPanic
didn't work as I had expected (and from a quick grep, doesn't look like we use it in the k8s codebase), so I did a more direct recover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why you can't use runtime.HandleCrash() - it's pretty widely used in the codebase and allows to use additional custom handler too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had initially misunderstood it to catch the panic and swallow it. But it does still crash. Updating the PR to use it now.
// PageBufferSize chunks of list items concurrently in the background. If the chunk attempt fails a "Expired" | ||
// error may be returned. | ||
func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { | ||
chunkC := make(chan runtime.Object, p.PageBufferSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error if p.PageBufferSize <= 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, set it to < since 0 size buffer is supported (tests demo this).
|
||
// eachListChunkBuffered invokes fn on each runtimeObject list chunk. It buffers up to | ||
// PageBufferSize chunks of list items concurrently in the background. If the chunk attempt fails a "Expired" | ||
// error may be returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to document the return value ... I think if fn returns an error, processing stops and that error is returned. if fn does not return an error, any error encountered while fetching the list (including timeout or cancelled errors from the context) is returned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yes, I've rewritten this with full details on errors.
5079ed7
to
0c80995
Compare
0c80995
to
6a64ee6
Compare
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: jpbetz, liggitt The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@@ -48,6 +50,9 @@ type ListPager struct { | |||
PageFn ListPageFunc | |||
|
|||
FullListIfExpired bool | |||
|
|||
// Number of pages to buffer | |||
PageBufferSize int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the buffer should be defined by the memory consumption instead of number of pages.
We can evaluate the size of a few chunks to dynamically determine the number of chunks to buffer, based on desired buffer size (in terms of memory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Maybe wait and see how this approach works out and then optimize as needed from there? We previously had quite a bit of code doing full lists, and so as we transition to paginated lists and this sort of incremental processing my expectation is we'll reduce memory usage, particularly for object kinds that have large counts. If we still hit scalability/performance limits once this is in use, that would seem like a good time to look into optimizing this further .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
Introduce a
ListPager.EachListItem
convenience utility for incrementally processing chunked List results. This makes it easy for a client to only request list chunks from the apiserver that it can actually keep up with processing.EachListItem
buffers up toPageBufferSize
(default: 10) pages in the background to minimize foreground wait time.Example usage:
There are some warts with this approach that I'd like to find a way of smoothing over, namely: (1) having to creating the pager (2) having to cast each item to the correct type. But that should be possible to add via code generators in the future.
The name and function signature were picked to be consistent with the existing
meta.EachListItem
function./kind feature
/priority important-longterm
/sig api-machinery