-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Enable uploads via pre-signed URLs #17349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,12 @@ pub enum PublishError { | |
| MissingHash(Box<DistFilename>), | ||
| #[error(transparent)] | ||
| RetryParsing(#[from] RetryParsingError), | ||
| #[error("Failed to reserve upload slot for `{}`", _0.user_display())] | ||
| Reserve(PathBuf, #[source] Box<PublishSendError>), | ||
| #[error("Failed to upload to S3 for `{}`", _0.user_display())] | ||
| S3Upload(PathBuf, #[source] Box<PublishSendError>), | ||
| #[error("Failed to finalize upload for `{}`", _0.user_display())] | ||
| Finalize(PathBuf, #[source] Box<PublishSendError>), | ||
| } | ||
|
|
||
| /// Failure to get the metadata for a specific file. | ||
|
|
@@ -625,7 +631,7 @@ pub async fn validate( | |
| .expect("URL must have path segments") | ||
| .push("validate"); | ||
|
|
||
| let request = build_validation_request( | ||
| let request = build_metadata_request( | ||
| raw_filename, | ||
| &validation_url, | ||
| client, | ||
|
|
@@ -653,6 +659,215 @@ pub async fn validate( | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Upload a file using the two-phase upload protocol for pyx. | ||
| /// | ||
| /// This is a more efficient upload method that: | ||
| /// 1. Reserves an upload slot and gets a pre-signed S3 URL. | ||
| /// 2. Uploads the file directly to S3. | ||
| /// 3. Finalizes the upload with the registry. | ||
| /// | ||
| /// Returns `true` if the file was newly uploaded and `false` if it already existed. | ||
| pub async fn upload_two_phase( | ||
| group: &UploadDistribution, | ||
| form_metadata: &FormMetadata, | ||
| registry: &DisplaySafeUrl, | ||
| client: &BaseClient, | ||
| s3_client: &BaseClient, | ||
| retry_policy: ExponentialBackoff, | ||
| credentials: &Credentials, | ||
| reporter: Arc<impl Reporter>, | ||
| ) -> Result<bool, PublishError> { | ||
| #[derive(Debug, Deserialize)] | ||
| struct ReserveResponse { | ||
| upload_url: Option<String>, | ||
| } | ||
|
|
||
| // Step 1: Reserve an upload slot. | ||
| let mut reserve_url = registry.clone(); | ||
| reserve_url | ||
| .path_segments_mut() | ||
| .expect("URL must have path segments") | ||
| .push("reserve"); | ||
|
|
||
| debug!("Reserving upload slot at {reserve_url}"); | ||
|
|
||
| let reserve_request = build_metadata_request( | ||
| &group.raw_filename, | ||
| &reserve_url, | ||
| client, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's the client without retries, I think that should be the regular client, plus/minus auth settings? |
||
| credentials, | ||
| form_metadata, | ||
| ); | ||
|
|
||
| let response = reserve_request.send().await.map_err(|err| { | ||
| PublishError::Reserve( | ||
| group.file.clone(), | ||
| PublishSendError::ReqwestMiddleware(err).into(), | ||
| ) | ||
| })?; | ||
|
|
||
| let status = response.status(); | ||
|
|
||
| let reserve_response: ReserveResponse = match status { | ||
| StatusCode::OK => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q, minor: should we use HTTP 409 (Conflict) for this, given that we're not tied to PyPI's response code semantics for our own upload scheme?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would 409 be for the case in which it already exists and isn't the same file? Or already exists and is the same file? Here I'm using 200 for "already exists and is the same" (and 201 for "created, now proceed to upload").
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, good point. 409 would be for "exists but with different content" I believe, 200 is semantically correct for "exists with the same content already." |
||
| debug!("File already uploaded: {}", group.raw_filename); | ||
| return Ok(false); | ||
| } | ||
| StatusCode::CREATED => { | ||
| let body = response.text().await.map_err(|err| { | ||
| PublishError::Reserve( | ||
| group.file.clone(), | ||
| PublishSendError::StatusNoBody(status, err).into(), | ||
| ) | ||
| })?; | ||
| serde_json::from_str(&body).map_err(|_| { | ||
| PublishError::Reserve( | ||
| group.file.clone(), | ||
| PublishSendError::Status(status, format!("Invalid JSON response: {body}")) | ||
| .into(), | ||
| ) | ||
| })? | ||
| } | ||
| _ => { | ||
| let body = response.text().await.unwrap_or_default(); | ||
| return Err(PublishError::Reserve( | ||
| group.file.clone(), | ||
| PublishSendError::Status(status, body).into(), | ||
| )); | ||
| } | ||
| }; | ||
|
|
||
| // Step 2: Upload the file directly to S3 (if needed). | ||
| // When upload_url is None, the file already exists on S3 with the correct hash. | ||
| if let Some(upload_url) = reserve_response.upload_url { | ||
| let s3_url = DisplaySafeUrl::parse(&upload_url).map_err(|_| { | ||
| PublishError::S3Upload( | ||
| group.file.clone(), | ||
| PublishSendError::Status( | ||
| StatusCode::BAD_REQUEST, | ||
| "Invalid S3 URL in reserve response".to_string(), | ||
| ) | ||
| .into(), | ||
| ) | ||
| })?; | ||
|
|
||
| debug!("Got pre-signed URL for upload: {s3_url}"); | ||
|
|
||
| // Use a custom retry loop since streaming uploads can't be retried by the middleware. | ||
| let file_size = fs_err::tokio::metadata(&group.file) | ||
| .await | ||
| .map_err(|err| { | ||
| PublishError::PublishPrepare( | ||
| group.file.clone(), | ||
| Box::new(PublishPrepareError::Io(err)), | ||
| ) | ||
| })? | ||
| .len(); | ||
|
|
||
| let mut retry_state = RetryState::start(retry_policy, s3_url.clone()); | ||
| loop { | ||
| let file = File::open(&group.file).await.map_err(|err| { | ||
| PublishError::PublishPrepare( | ||
| group.file.clone(), | ||
| Box::new(PublishPrepareError::Io(err)), | ||
| ) | ||
| })?; | ||
|
|
||
| let idx = reporter.on_upload_start(&group.filename.to_string(), Some(file_size)); | ||
| let reporter_clone = reporter.clone(); | ||
| let reader = ProgressReader::new(file, move |read| { | ||
| reporter_clone.on_upload_progress(idx, read as u64); | ||
| }); | ||
| let file_reader = Body::wrap_stream(ReaderStream::new(reader)); | ||
|
|
||
| let result = s3_client | ||
| .for_host(&s3_url) | ||
| .raw_client() | ||
| .put(Url::from(s3_url.clone())) | ||
| .header(reqwest::header::CONTENT_TYPE, "application/octet-stream") | ||
| .header(reqwest::header::CONTENT_LENGTH, file_size) | ||
| .body(file_reader) | ||
| .send() | ||
| .await; | ||
|
|
||
| let response = match result { | ||
| Ok(response) => { | ||
| reporter.on_upload_complete(idx); | ||
| response | ||
| } | ||
| Err(err) => { | ||
| let middleware_retries = | ||
| if let Some(RetryError::WithRetries { retries, .. }) = | ||
| (&err as &dyn std::error::Error).downcast_ref::<RetryError>() | ||
| { | ||
| *retries | ||
| } else { | ||
| 0 | ||
| }; | ||
| if let Some(backoff) = retry_state.should_retry(&err, middleware_retries) { | ||
| retry_state.sleep_backoff(backoff).await; | ||
| continue; | ||
| } | ||
| return Err(PublishError::S3Upload( | ||
| group.file.clone(), | ||
| PublishSendError::ReqwestMiddleware(err).into(), | ||
| )); | ||
| } | ||
| }; | ||
|
|
||
| if response.status().is_success() { | ||
| break; | ||
| } | ||
|
|
||
| let status = response.status(); | ||
| let body = response.text().await.unwrap_or_default(); | ||
| return Err(PublishError::S3Upload( | ||
| group.file.clone(), | ||
| PublishSendError::Status(status, format!("S3 upload failed: {body}")).into(), | ||
| )); | ||
| } | ||
|
|
||
| debug!("S3 upload complete for {}", group.raw_filename); | ||
| } else { | ||
| debug!( | ||
| "File already exists on S3, skipping upload: {}", | ||
| group.raw_filename | ||
| ); | ||
| } | ||
|
|
||
| // Step 3: Finalize the upload. | ||
| let mut finalize_url = registry.clone(); | ||
| finalize_url | ||
| .path_segments_mut() | ||
| .expect("URL must have path segments") | ||
| .push("finalize"); | ||
|
|
||
| debug!("Finalizing upload at {finalize_url}"); | ||
|
|
||
| let finalize_request = build_metadata_request( | ||
| &group.raw_filename, | ||
| &finalize_url, | ||
| client, | ||
| credentials, | ||
| form_metadata, | ||
| ); | ||
|
|
||
| let response = finalize_request.send().await.map_err(|err| { | ||
| PublishError::Finalize( | ||
| group.file.clone(), | ||
| PublishSendError::ReqwestMiddleware(err).into(), | ||
| ) | ||
| })?; | ||
|
|
||
| handle_response(&finalize_url, response) | ||
| .await | ||
| .map_err(|err| PublishError::Finalize(group.file.clone(), err.into()))?; | ||
|
|
||
| debug!("Upload finalized for {}", group.raw_filename); | ||
|
|
||
| Ok(true) | ||
| } | ||
|
|
||
| /// Check whether we should skip the upload of a file because it already exists on the index. | ||
| pub async fn check_url( | ||
| check_url_client: &CheckUrlClient<'_>, | ||
|
|
@@ -1067,10 +1282,8 @@ async fn build_upload_request<'a>( | |
| Ok((request, idx)) | ||
| } | ||
|
|
||
| /// Build the validation request, to validate the upload without actually uploading the file. | ||
| /// | ||
| /// Returns the [`RequestBuilder`]. | ||
| fn build_validation_request<'a>( | ||
| /// Build a request with form metadata but without the file content. | ||
| fn build_metadata_request<'a>( | ||
| raw_filename: &str, | ||
| registry: &DisplaySafeUrl, | ||
| client: &'a BaseClient, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable to me, although noting that we could in principle do this without a flag by having uv send an
Accept: ...header that indicates support, to which pyx would then respond with the pre-signed URL.(That might be nice for enabling this implicit/by default in the future, but seems like not worth doing initially!)