Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): publish: Fix panic + Make waiting for build results more modular #4410

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions lib/cli/src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context as _;
use clap::Parser;
use wasmer_registry::wasmer_env::WasmerEnv;
use wasmer_registry::{publish::PublishWait, wasmer_env::WasmerEnv};

/// Publish a package to the package registry.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -30,6 +30,12 @@ pub struct Publish {
/// Wait for package to be available on the registry before exiting.
#[clap(long)]
pub wait: bool,
/// Wait for the package and all dependencies to be available on the registry
/// before exiting.
///
/// This includes the container, native executables and bindings.
#[clap(long)]
pub wait_all: bool,
/// Timeout (in seconds) for the publish query to the registry.
///
/// Note that this is not the timeout for the entire publish process, but
Expand All @@ -40,13 +46,20 @@ pub struct Publish {

impl Publish {
/// Executes `wasmer publish`
#[tokio::main]
pub async fn execute(&self) -> Result<(), anyhow::Error> {
pub fn execute(&self) -> Result<(), anyhow::Error> {
let token = self
.env
.token()
.context("could not determine auth token for registry - run 'wasmer login'")?;

let wait = if self.wait_all {
PublishWait::new_all()
} else if self.wait {
PublishWait::new_container()
} else {
PublishWait::new_none()
};

let publish = wasmer_registry::package::builder::Publish {
registry: self.env.registry_endpoint().map(|u| u.to_string()).ok(),
dry_run: self.dry_run,
Expand All @@ -56,10 +69,10 @@ impl Publish {
token,
no_validate: self.no_validate,
package_path: self.package_path.clone(),
wait: self.wait,
wait,
timeout: self.timeout.into(),
};
publish.execute().await.map_err(on_error)?;
publish.execute().map_err(on_error)?;

if let Err(e) = invalidate_graphql_query_cache(&self.env) {
tracing::warn!(
Expand Down
4 changes: 2 additions & 2 deletions lib/cli/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ pub async fn republish_package_with_bumped_version(
quiet: false,
package_name: None,
version: None,
wait: false,
wait: wasmer_registry::publish::PublishWait::new_none(),
token,
no_validate: true,
package_path: Some(dir.to_str().unwrap().to_string()),
// Use a high timeout to prevent interrupting uploads of
// large packages.
timeout: std::time::Duration::from_secs(60 * 60 * 12),
};
publish.execute().await?;
publish.execute()?;

Ok(manifest)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/registry/src/package/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tar::Builder;
use thiserror::Error;
use time::{self, OffsetDateTime};

use crate::publish::PublishWait;
use crate::{package::builder::validate::ValidationPolicy, publish::SignArchiveResult};
use crate::{WasmerConfig, PACKAGE_TOML_FALLBACK_NAME};

Expand Down Expand Up @@ -40,7 +41,7 @@ pub struct Publish {
/// Directory containing the `wasmer.toml` (defaults to current root dir)
pub package_path: Option<String>,
/// Wait for package to be available on the registry before exiting
pub wait: bool,
pub wait: PublishWait,
/// Timeout (in seconds) for the publish query to the registry
pub timeout: Duration,
}
Expand All @@ -63,7 +64,7 @@ enum PackageBuildError {

impl Publish {
/// Executes `wasmer publish`
pub async fn execute(&self) -> Result<(), anyhow::Error> {
pub fn execute(&self) -> Result<(), anyhow::Error> {
let input_path = match self.package_path.as_ref() {
Some(s) => std::env::current_dir()?.join(s),
None => std::env::current_dir()?,
Expand Down Expand Up @@ -194,7 +195,6 @@ impl Publish {
self.wait,
self.timeout,
)
.await
}

fn validation_policy(&self) -> Box<dyn ValidationPolicy> {
Expand Down
104 changes: 95 additions & 9 deletions lib/registry/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,49 @@ use std::time::Duration;
static UPLOAD: Emoji<'_, '_> = Emoji("⬆️ ", "");
static PACKAGE: Emoji<'_, '_> = Emoji("📦", "");

/// Different conditions that can be "awaited" when publishing a package.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PublishWait {
pub container: bool,
pub native_executables: bool,
pub bindings: bool,

pub timeout: Option<Duration>,
}

impl PublishWait {
pub fn is_any(self) -> bool {
self.container || self.native_executables || self.bindings
}

pub fn new_none() -> Self {
Self {
container: false,
native_executables: false,
bindings: false,
timeout: None,
}
}

pub fn new_all() -> Self {
Self {
container: true,
native_executables: true,
bindings: true,
timeout: None,
}
}

pub fn new_container() -> Self {
Self {
container: true,
native_executables: false,
bindings: false,
timeout: None,
}
}
}

#[derive(Debug, Clone)]
pub enum SignArchiveResult {
Ok {
Expand All @@ -33,7 +76,7 @@ pub enum SignArchiveResult {
}

#[allow(clippy::too_many_arguments)]
pub async fn try_chunked_uploading(
pub fn try_chunked_uploading(
registry: Option<String>,
token: Option<String>,
package: &wasmer_toml::Package,
Expand All @@ -45,7 +88,7 @@ pub async fn try_chunked_uploading(
maybe_signature_data: &SignArchiveResult,
archived_data_size: u64,
quiet: bool,
wait: bool,
wait: PublishWait,
timeout: Duration,
) -> Result<(), anyhow::Error> {
let (registry, token) = initialize_registry_and_token(registry, token)?;
Expand Down Expand Up @@ -81,7 +124,7 @@ pub async fn try_chunked_uploading(
signature: maybe_signature_data,
signed_url: Some(signed_url.url),
private: Some(package.private),
wait: Some(wait),
wait: Some(wait.is_any()),
});

let response: publish_package_mutation_chunked::ResponseData =
Expand All @@ -91,14 +134,20 @@ pub async fn try_chunked_uploading(
if !pkg.success {
return Err(anyhow::anyhow!("Could not publish package"));
}
if wait {
wait_for_package_version_to_become_ready(
if wait.is_any() {
let f = wait_for_package_version_to_become_ready(
&registry,
&token,
pkg.package_version.id,
quiet,
)
.await?;
wait,
);

if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.block_on(f)?
} else {
tokio::runtime::Runtime::new().unwrap().block_on(f)?;
}
}
}

Expand Down Expand Up @@ -381,6 +430,7 @@ async fn wait_for_package_version_to_become_ready(
token: &str,
package_version_id: impl AsRef<str>,
quiet: bool,
mut conditions: PublishWait,
) -> Result<()> {
let (mut stream, _client) =
subscribe_package_version_ready(registry, token, package_version_id.as_ref()).await?;
Expand All @@ -391,20 +441,56 @@ async fn wait_for_package_version_to_become_ready(
show_spinners_while_waiting(&state);
}

while let Some(data) = stream.next().await {
if !conditions.is_any() {
return Ok(());
}

let deadline = conditions
.timeout
.map(|x| std::time::Instant::now() + x)
.unwrap_or_else(|| std::time::Instant::now() + std::time::Duration::from_secs(60 * 10));

loop {
if !conditions.is_any() {
break;
}
if std::time::Instant::now() > deadline {
return Err(anyhow::anyhow!(
"Timed out waiting for package version to become ready"
));
}

let data = match tokio::time::timeout_at(deadline.into(), stream.next()).await {
Err(_) => {
return Err(anyhow::anyhow!(
"Timed out waiting for package version to become ready"
))
}
Ok(None) => {
break;
}
Ok(Some(data)) => data,
};

if let Some(res_data) = data.unwrap().data {
match res_data.package_version_ready.state {
PackageVersionState::BINDINGS_GENERATED => {
let mut st = state.bindings_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);
let is_ready = res_data.package_version_ready.success;
*st = Some(is_ready);
conditions.bindings = false;
}
PackageVersionState::NATIVE_EXES_GENERATED => {
let mut st = state.native_exes_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);

conditions.native_executables = false;
}
PackageVersionState::WEBC_GENERATED => {
let mut st = state.webc_generated.lock().unwrap();
*st = Some(res_data.package_version_ready.success);

conditions.container = false;
}
PackageVersionState::Other(_) => {}
}
Expand Down
Loading