From 7055657fe48aeab1577db7687a9f6acfa820acd8 Mon Sep 17 00:00:00 2001 From: Christoph Herzog Date: Thu, 25 Jan 2024 20:55:18 +0100 Subject: [PATCH] fix(cli): publish: Fix panick + Make waiting for build results more modular * Fixes a panic due to nested tokio runtimes * Exposes a --wait flag, which only waits for the container, and a --wait-all flag, which also waits for bindings and native executables. --- lib/cli/src/commands/publish.rs | 23 ++++-- lib/cli/src/utils/mod.rs | 4 +- lib/registry/src/package/builder.rs | 6 +- lib/registry/src/publish.rs | 104 +++++++++++++++++++++++++--- 4 files changed, 118 insertions(+), 19 deletions(-) diff --git a/lib/cli/src/commands/publish.rs b/lib/cli/src/commands/publish.rs index 9f2c65b3e6c..040af89bd0c 100644 --- a/lib/cli/src/commands/publish.rs +++ b/lib/cli/src/commands/publish.rs @@ -1,6 +1,6 @@ use anyhow::Context as _; use clap::Parser; -use wasmer_registry::wasmer_env::WasmerEnv; +use wasmer_registry::{publish::PublishWait, wasmer_env::WasmerEnv}; /// Publish a package to the package registry. #[derive(Debug, Parser)] @@ -30,6 +30,12 @@ pub struct Publish { /// Wait for package to be available on the registry before exiting. #[clap(long)] pub wait: bool, + /// Wait for the package and all dependencies to be available on the registry + /// before exiting. + /// + /// This includes the container, native executables and bindings. + #[clap(long)] + pub wait_all: bool, /// Timeout (in seconds) for the publish query to the registry. /// /// Note that this is not the timeout for the entire publish process, but @@ -40,13 +46,20 @@ pub struct Publish { impl Publish { /// Executes `wasmer publish` - #[tokio::main] - pub async fn execute(&self) -> Result<(), anyhow::Error> { + pub fn execute(&self) -> Result<(), anyhow::Error> { let token = self .env .token() .context("could not determine auth token for registry - run 'wasmer login'")?; + let wait = if self.wait_all { + PublishWait::new_all() + } else if self.wait { + PublishWait::new_container() + } else { + PublishWait::new_none() + }; + let publish = wasmer_registry::package::builder::Publish { registry: self.env.registry_endpoint().map(|u| u.to_string()).ok(), dry_run: self.dry_run, @@ -56,10 +69,10 @@ impl Publish { token, no_validate: self.no_validate, package_path: self.package_path.clone(), - wait: self.wait, + wait, timeout: self.timeout.into(), }; - publish.execute().await.map_err(on_error)?; + publish.execute().map_err(on_error)?; if let Err(e) = invalidate_graphql_query_cache(&self.env) { tracing::warn!( diff --git a/lib/cli/src/utils/mod.rs b/lib/cli/src/utils/mod.rs index 9908b658fd7..fbf448ebd68 100644 --- a/lib/cli/src/utils/mod.rs +++ b/lib/cli/src/utils/mod.rs @@ -238,7 +238,7 @@ pub async fn republish_package_with_bumped_version( quiet: false, package_name: None, version: None, - wait: false, + wait: wasmer_registry::publish::PublishWait::new_none(), token, no_validate: true, package_path: Some(dir.to_str().unwrap().to_string()), @@ -246,7 +246,7 @@ pub async fn republish_package_with_bumped_version( // large packages. timeout: std::time::Duration::from_secs(60 * 60 * 12), }; - publish.execute().await?; + publish.execute()?; Ok(manifest) } diff --git a/lib/registry/src/package/builder.rs b/lib/registry/src/package/builder.rs index 70134ae6633..326db4d28ae 100644 --- a/lib/registry/src/package/builder.rs +++ b/lib/registry/src/package/builder.rs @@ -10,6 +10,7 @@ use tar::Builder; use thiserror::Error; use time::{self, OffsetDateTime}; +use crate::publish::PublishWait; use crate::{package::builder::validate::ValidationPolicy, publish::SignArchiveResult}; use crate::{WasmerConfig, PACKAGE_TOML_FALLBACK_NAME}; @@ -40,7 +41,7 @@ pub struct Publish { /// Directory containing the `wasmer.toml` (defaults to current root dir) pub package_path: Option, /// Wait for package to be available on the registry before exiting - pub wait: bool, + pub wait: PublishWait, /// Timeout (in seconds) for the publish query to the registry pub timeout: Duration, } @@ -63,7 +64,7 @@ enum PackageBuildError { impl Publish { /// Executes `wasmer publish` - pub async fn execute(&self) -> Result<(), anyhow::Error> { + pub fn execute(&self) -> Result<(), anyhow::Error> { let input_path = match self.package_path.as_ref() { Some(s) => std::env::current_dir()?.join(s), None => std::env::current_dir()?, @@ -194,7 +195,6 @@ impl Publish { self.wait, self.timeout, ) - .await } fn validation_policy(&self) -> Box { diff --git a/lib/registry/src/publish.rs b/lib/registry/src/publish.rs index 03b3f9c12b4..433e8559c70 100644 --- a/lib/registry/src/publish.rs +++ b/lib/registry/src/publish.rs @@ -23,6 +23,49 @@ use std::time::Duration; static UPLOAD: Emoji<'_, '_> = Emoji("⬆️ ", ""); static PACKAGE: Emoji<'_, '_> = Emoji("📦", ""); +/// Different conditions that can be "awaited" when publishing a package. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PublishWait { + pub container: bool, + pub native_executables: bool, + pub bindings: bool, + + pub timeout: Option, +} + +impl PublishWait { + pub fn is_any(self) -> bool { + self.container || self.native_executables || self.bindings + } + + pub fn new_none() -> Self { + Self { + container: false, + native_executables: false, + bindings: false, + timeout: None, + } + } + + pub fn new_all() -> Self { + Self { + container: true, + native_executables: true, + bindings: true, + timeout: None, + } + } + + pub fn new_container() -> Self { + Self { + container: true, + native_executables: false, + bindings: false, + timeout: None, + } + } +} + #[derive(Debug, Clone)] pub enum SignArchiveResult { Ok { @@ -33,7 +76,7 @@ pub enum SignArchiveResult { } #[allow(clippy::too_many_arguments)] -pub async fn try_chunked_uploading( +pub fn try_chunked_uploading( registry: Option, token: Option, package: &wasmer_toml::Package, @@ -45,7 +88,7 @@ pub async fn try_chunked_uploading( maybe_signature_data: &SignArchiveResult, archived_data_size: u64, quiet: bool, - wait: bool, + wait: PublishWait, timeout: Duration, ) -> Result<(), anyhow::Error> { let (registry, token) = initialize_registry_and_token(registry, token)?; @@ -81,7 +124,7 @@ pub async fn try_chunked_uploading( signature: maybe_signature_data, signed_url: Some(signed_url.url), private: Some(package.private), - wait: Some(wait), + wait: Some(wait.is_any()), }); let response: publish_package_mutation_chunked::ResponseData = @@ -91,14 +134,20 @@ pub async fn try_chunked_uploading( if !pkg.success { return Err(anyhow::anyhow!("Could not publish package")); } - if wait { - wait_for_package_version_to_become_ready( + if wait.is_any() { + let f = wait_for_package_version_to_become_ready( ®istry, &token, pkg.package_version.id, quiet, - ) - .await?; + wait, + ); + + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.block_on(f)? + } else { + tokio::runtime::Runtime::new().unwrap().block_on(f)?; + } } } @@ -381,6 +430,7 @@ async fn wait_for_package_version_to_become_ready( token: &str, package_version_id: impl AsRef, quiet: bool, + mut conditions: PublishWait, ) -> Result<()> { let (mut stream, _client) = subscribe_package_version_ready(registry, token, package_version_id.as_ref()).await?; @@ -391,20 +441,56 @@ async fn wait_for_package_version_to_become_ready( show_spinners_while_waiting(&state); } - while let Some(data) = stream.next().await { + if !conditions.is_any() { + return Ok(()); + } + + let deadline = conditions + .timeout + .map(|x| std::time::Instant::now() + x) + .unwrap_or_else(|| std::time::Instant::now() + std::time::Duration::from_secs(60 * 10)); + + loop { + if !conditions.is_any() { + break; + } + if std::time::Instant::now() > deadline { + return Err(anyhow::anyhow!( + "Timed out waiting for package version to become ready" + )); + } + + let data = match tokio::time::timeout_at(deadline.into(), stream.next()).await { + Err(_) => { + return Err(anyhow::anyhow!( + "Timed out waiting for package version to become ready" + )) + } + Ok(None) => { + break; + } + Ok(Some(data)) => data, + }; + if let Some(res_data) = data.unwrap().data { match res_data.package_version_ready.state { PackageVersionState::BINDINGS_GENERATED => { let mut st = state.bindings_generated.lock().unwrap(); - *st = Some(res_data.package_version_ready.success); + let is_ready = res_data.package_version_ready.success; + *st = Some(is_ready); + conditions.bindings = false; } PackageVersionState::NATIVE_EXES_GENERATED => { let mut st = state.native_exes_generated.lock().unwrap(); *st = Some(res_data.package_version_ready.success); + + conditions.native_executables = false; } PackageVersionState::WEBC_GENERATED => { let mut st = state.webc_generated.lock().unwrap(); *st = Some(res_data.package_version_ready.success); + + conditions.container = false; } PackageVersionState::Other(_) => {} }