-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Async Pipeline Upload Flow #1927
Conversation
This requires functionality in the API to be deployed. So I am leaving it as draft until then |
795dc59
to
69a1fb8
Compare
fe8b0bf
to
81cbb6f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, but thought this deserved some comments...
agent/pipeline_uploader.go
Outdated
// 422 responses will always fail no need to retry | ||
if apierr := new( | ||
api.ErrorResponse, | ||
); errors.As(err, &apierr) && apierr.Response.StatusCode == http.StatusUnprocessableEntity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I used new
with errors.As
like this previously, but that was really only because I wanted to do it on one line. If you're splitting it over multiple lines, then the suggested form in the errors package makes more sense to me:
var apierr *api.ErrorResponse
if errors.As(err, &apierr) && ... {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this a bit. See my commit message for my reasoning.
agent/pipeline_uploader.go
Outdated
|
||
var ( | ||
locationRegex = regexp.MustCompile(`jobs/(?P<jobID>[^/]+)/pipelines/(?P<uploadUUID>[^/]+)`) | ||
ErrRegex = func(s string, r *regexp.Regexp) error { return fmt.Errorf("regex %s failed to match %s", r, s) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Three questions:
- is it necessary for it to be a var and not simply a func? I don't see it being changed anywhere
- is it necessary for it to be exported? I don't see it being used outside of the package
- is "ErrRegex" a good name? That makes it sound like it is a
*regexp.Regexp
when it is actually a func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies. I've made a better error now. I decided not to pass up the regex, just the location, as well.
46ba455
to
d946116
Compare
Type assertions will fail if error wrapping is used
This is a very common use case, but the error unwrapping pattern is pretty tedious to write each time, and the resulting line of code is usually very long. The pattern itself is also easy to get wrong as you need to pass a pointer to a pointer to the second argument of `errors.As`, but it accepts a value of `any` type. The combination of a very long line and code that needs to be checked carefully is precarious, so I think abstracting this pattern is worth it.
46369fb
to
dbbe0d7
Compare
Co-authored-by: Josh Deprez <[email protected]>
dbbe0d7
to
a967d3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! 😎
I have a few more comments, but consider them all non-blocking.
|
||
var locationRegex = regexp.MustCompile(`jobs/(?P<jobID>[^/]+)/pipelines/(?P<uploadUUID>[^/]+)`) | ||
|
||
type PipelineUploader struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be unfair of me to ask Benno to add doc comments to all exported top-level identifiers, while failing to ask you to do the same...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this (and everything else) in #1972
|
||
if jobIDFromResponse != u.JobID { | ||
return fmt.Errorf( | ||
"JobID from API: %s does not match request: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a job for %q instead of %s - if the two strings mismatch at this point, there's a bug, and so they could be anything, which %q is ideal for. The same goes for UUID below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I'll use that.
}) | ||
} | ||
|
||
type ErrLocationParse struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the error type isn't used / referred to elsewhere, should it be unexported instead?
But I am keen for more error types in general 😄 (not necessarily in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should refer to it in tests, but we haven't tested this error so 🤷
ctx := context.Background() | ||
l := clicommand.CreateLogger(&clicommand.PipelineUploadConfig{LogLevel: "notice"}) | ||
|
||
type Test struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not wrong to start identifiers within a func with a capital, but it seems weird to see something that looks like an exported identifier that can't possibly be used like one...
It shouldn't be necessary to avoid conflicting with test
as the loop variable below - the loop variable would shadow the type (e.g. https://go.dev/play/p/o8cOdW5oh2q)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it does conflict with the argument to the closure used to avoid loop variable shenanigans: https://go.dev/play/p/VmzVE01PLdl
The type identifier does not need to exist. It's only because of the loop variable shenanigans that we're even having this conversation. I guess just reassigning it avoids all this, so I'll do that.
} | ||
|
||
for _, test := range tests { | ||
func(test Test) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop variable semantics changes can't land quickly enough, IMO.
For the record, I'm not opposed to either of the two approaches to solving this problem (the other being test := test
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of statements that seem meaningless, like test := test
. But as in this case we can avoid a lot of boilerplate with it, I'll do it.
return c.doRequest(req, nil) | ||
} | ||
|
||
// UploadPipelineAsync uploads the pipeline to the Buildkite Agent API. It does not wait for the upload to complete. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely it has to complete the upload (sending bytes to the server), but the point is to avoid waiting on "processing"? (I think all I'm saying is that "upload" is ambiguous.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's called an StepUpload
in the API even after the HTTP upload finish. I agree that this is confusing. I've fixed the comment.
RetrySleepFunc func(time.Duration) | ||
} | ||
|
||
func (u *PipelineUploader) AsyncUploadFlow(ctx context.Context, l logger.Logger) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the perspective of the other package, this is the only way to upload a pipeline with the new PipelineUploader
(is that correct?) So I wonder if it should be named Upload
. The inner workings make use of an async approach, but the whole method isn't exactly "async".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, the name was a holdover from my first attempt, where it did not fall back.
Thanks @DrJosh9000 I'm going to make a followup PR with your suggestions. This one has been floating around for a while and I don't want to have to rebase again. |
See #1927 (comment) We can remove the reassignment after [this experiment](golang/go#57969) is concluded.
The asynchronicity is hidden from the consumer, and may not even happen, as observed in #1927 (comment)
See #1927 (comment) We can remove the reassignment after [this experiment](golang/go#57969) is concluded.
The asynchronicity is hidden from the consumer, and may not even happen, as observed in #1927 (comment)
This is a new method of uploading pipelines introduced in the agent API
which will reduce long held connections between client and server. It works
by first uploading the Pipeline with a client generated UUID to the same
route as before, but with the query parameter
async=true
set.Unlike waiting up to 15 seconds as before, this will return status 202 immediately
with a
Location
header set to where to check the status of the upload.The API will also set a
Retry-After
header to inform the client to sleep for this duration.The agent will then poll the route returned as the
Location
header until the status of the upload is "applied".If the API does not support the Async Upload Flow, it will return 200 for the initial pipeline upload instead of 202. Then the agent will revert to its previous behaviour and poll the pipeline upload route with a new pipeline upload each time (although with the same UUID so the API will not create new steps).