Skip to content

Commit

Permalink
fix(cli): publish: Fix panick + Make waiting for build results more m…
Browse files Browse the repository at this point in the history
…odular

* Fixes a panic due to nested tokio runtimes

* Exposes a --wait flag, which only waits for the container, and a
--wait-all flag, which also waits for bindings and native executables.
  • Loading branch information
theduke committed Jan 26, 2024
1 parent 1a49312 commit feb4ee4
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 19 deletions.
23 changes: 18 additions & 5 deletions lib/cli/src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context as _;
use clap::Parser;
use wasmer_registry::wasmer_env::WasmerEnv;
use wasmer_registry::{publish::PublishWait, wasmer_env::WasmerEnv};

/// Publish a package to the package registry.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -30,6 +30,12 @@ pub struct Publish {
/// Wait for package to be available on the registry before exiting.
#[clap(long)]
pub wait: bool,
/// Wait for the package and all dependencies to be available on the registry
/// before exiting.
///
/// This includes the container, native executables and bindings.
#[clap(long)]
pub wait_all: bool,
/// Timeout (in seconds) for the publish query to the registry.
///
/// Note that this is not the timeout for the entire publish process, but
Expand All @@ -40,13 +46,20 @@ pub struct Publish {

impl Publish {
/// Executes `wasmer publish`
#[tokio::main]
pub async fn execute(&self) -> Result<(), anyhow::Error> {
pub fn execute(&self) -> Result<(), anyhow::Error> {
let token = self
.env
.token()
.context("could not determine auth token for registry - run 'wasmer login'")?;

let wait = if self.wait_all {
PublishWait::new_all()
} else if self.wait {
PublishWait::new_container()
} else {
PublishWait::new_none()
};

let publish = wasmer_registry::package::builder::Publish {
registry: self.env.registry_endpoint().map(|u| u.to_string()).ok(),
dry_run: self.dry_run,
Expand All @@ -56,10 +69,10 @@ impl Publish {
token,
no_validate: self.no_validate,
package_path: self.package_path.clone(),
wait: self.wait,
wait,
timeout: self.timeout.into(),
};
publish.execute().await.map_err(on_error)?;
publish.execute().map_err(on_error)?;

if let Err(e) = invalidate_graphql_query_cache(&self.env) {
tracing::warn!(
Expand Down
4 changes: 2 additions & 2 deletions lib/cli/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ pub async fn republish_package_with_bumped_version(
quiet: false,
package_name: None,
version: None,
wait: false,
wait: wasmer_registry::publish::PublishWait::new_none(),
token,
no_validate: true,
package_path: Some(dir.to_str().unwrap().to_string()),
// Use a high timeout to prevent interrupting uploads of
// large packages.
timeout: std::time::Duration::from_secs(60 * 60 * 12),
};
publish.execute().await?;
publish.execute()?;

Ok(manifest)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/registry/src/package/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tar::Builder;
use thiserror::Error;
use time::{self, OffsetDateTime};

use crate::publish::PublishWait;
use crate::{package::builder::validate::ValidationPolicy, publish::SignArchiveResult};
use crate::{WasmerConfig, PACKAGE_TOML_FALLBACK_NAME};

Expand Down Expand Up @@ -40,7 +41,7 @@ pub struct Publish {
/// Directory containing the `wasmer.toml` (defaults to current root dir)
pub package_path: Option<String>,
/// Wait for package to be available on the registry before exiting
pub wait: bool,
pub wait: PublishWait,
/// Timeout (in seconds) for the publish query to the registry
pub timeout: Duration,
}
Expand All @@ -63,7 +64,7 @@ enum PackageBuildError {

impl Publish {
/// Executes `wasmer publish`
pub async fn execute(&self) -> Result<(), anyhow::Error> {
pub fn execute(&self) -> Result<(), anyhow::Error> {
let input_path = match self.package_path.as_ref() {
Some(s) => std::env::current_dir()?.join(s),
None => std::env::current_dir()?,
Expand Down Expand Up @@ -194,7 +195,6 @@ impl Publish {
self.wait,
self.timeout,
)
.await
}

fn validation_policy(&self) -> Box<dyn ValidationPolicy> {
Expand Down
105 changes: 96 additions & 9 deletions lib/registry/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,49 @@ use std::time::Duration;
static UPLOAD: Emoji<'_, '_> = Emoji("⬆️ ", "");
static PACKAGE: Emoji<'_, '_> = Emoji("📦", "");

/// Different conditions that can be "awaited" when publishing a package.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PublishWait {
pub container: bool,
pub native_executables: bool,
pub bindings: bool,

pub timeout: Option<Duration>,
}

impl PublishWait {
pub fn is_any(self) -> bool {
self.container || self.native_executables || self.bindings
}

pub fn new_none() -> Self {
Self {
container: false,
native_executables: false,
bindings: false,
timeout: None,
}
}

pub fn new_all() -> Self {
Self {
container: true,
native_executables: true,
bindings: true,
timeout: None,
}
}

pub fn new_container() -> Self {
Self {
container: true,
native_executables: false,
bindings: false,
timeout: None,
}
}
}

#[derive(Debug, Clone)]
pub enum SignArchiveResult {
Ok {
Expand All @@ -33,7 +76,7 @@ pub enum SignArchiveResult {
}

#[allow(clippy::too_many_arguments)]
pub async fn try_chunked_uploading(
pub fn try_chunked_uploading(
registry: Option<String>,
token: Option<String>,
package: &wasmer_toml::Package,
Expand All @@ -45,7 +88,7 @@ pub async fn try_chunked_uploading(
maybe_signature_data: &SignArchiveResult,
archived_data_size: u64,
quiet: bool,
wait: bool,
wait: PublishWait,
timeout: Duration,
) -> Result<(), anyhow::Error> {
let (registry, token) = initialize_registry_and_token(registry, token)?;
Expand Down Expand Up @@ -81,7 +124,7 @@ pub async fn try_chunked_uploading(
signature: maybe_signature_data,
signed_url: Some(signed_url.url),
private: Some(package.private),
wait: Some(wait),
wait: Some(wait.is_any()),
});

let response: publish_package_mutation_chunked::ResponseData =
Expand All @@ -91,14 +134,20 @@ pub async fn try_chunked_uploading(
if !pkg.success {
return Err(anyhow::anyhow!("Could not publish package"));
}
if wait {
wait_for_package_version_to_become_ready(
if wait.is_any() {
let f = wait_for_package_version_to_become_ready(
&registry,
&token,
pkg.package_version.id,
quiet,
)
.await?;
wait,
);

if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.block_on(f)?
} else {
tokio::runtime::Runtime::new().unwrap().block_on(f)?;
}
}
}

Expand Down Expand Up @@ -381,6 +430,7 @@ async fn wait_for_package_version_to_become_ready(
token: &str,
package_version_id: impl AsRef<str>,
quiet: bool,
mut conditions: PublishWait,
) -> Result<()> {
let (mut stream, _client) =
subscribe_package_version_ready(registry, token, package_version_id.as_ref()).await?;
Expand All @@ -391,20 +441,57 @@ async fn wait_for_package_version_to_become_ready(
show_spinners_while_waiting(&state);
}

while let Some(data) = stream.next().await {
if !conditions.is_any() {
return Ok(());
}

let deadline = conditions
.timeout
.clone()
.map(|x| std::time::Instant::now() + x)
.unwrap_or_else(|| std::time::Instant::now() + std::time::Duration::from_secs(60 * 10));

loop {
if !conditions.is_any() {
break;
}
if std::time::Instant::now() > deadline {
return Err(anyhow::anyhow!(
"Timed out waiting for package version to become ready"
));
}

let data = match tokio::time::timeout_at(deadline.into(), stream.next()).await {
Err(_) => {
return Err(anyhow::anyhow!(
"Timed out waiting for package version to become ready"
))
}
Ok(None) => {
break;
}
Ok(Some(data)) => data,
};

if let Some(res_data) = data.unwrap().data {
match res_data.package_version_ready.state {
PackageVersionState::BINDINGS_GENERATED => {
let mut st = state.bindings_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);
let is_ready = res_data.package_version_ready.success;
*st = Some(is_ready);
conditions.bindings = false;
}
PackageVersionState::NATIVE_EXES_GENERATED => {
let mut st = state.native_exes_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);

conditions.native_executables = false;
}
PackageVersionState::WEBC_GENERATED => {
let mut st = state.webc_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);

conditions.container = false;
}
PackageVersionState::Other(_) => {}
}
Expand Down

0 comments on commit feb4ee4

Please sign in to comment.