Skip to content

Commit

Permalink
bug: big archives being cut off at 32 768 bytes (#423)
Browse files Browse the repository at this point in the history
* refactor: set chunked header

* refactor: don't send stream across threads

Doing this is causing us to loose everything after 32 768 bytes. Don't
know the reason why 🤷

* refactor: fix health check

* refactor: remove unused use
chesedo authored Oct 21, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c222354 commit b00671d
Showing 7 changed files with 20 additions and 32 deletions.
1 change: 1 addition & 0 deletions cargo-shuttle/src/client.rs
Original file line number Diff line number Diff line change
@@ -250,6 +250,7 @@ impl Client {

if let Some(body) = body {
builder = builder.body(body);
builder = builder.header("Transfer-Encoding", "chunked");
}

builder.send().await
5 changes: 2 additions & 3 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
@@ -355,7 +355,6 @@ mod tests {
use axum::body::Bytes;
use ctor::ctor;
use flate2::{write::GzEncoder, Compression};
use futures::FutureExt;
use shuttle_service::Logger;
use tokio::{select, sync::mpsc, time::sleep};
use tracing_subscriber::prelude::*;
@@ -944,7 +943,7 @@ mod tests {
id,
service_name: "nil_id".to_string(),
service_id: Uuid::new_v4(),
data_stream: Box::pin(async { Ok(Bytes::from("violets are red")) }.into_stream()),
data: Bytes::from("violets are red").to_vec(),
will_run_tests: false,
})
.await;
@@ -987,7 +986,7 @@ mod tests {
id: Uuid::new_v4(),
service_name: format!("deploy-layer-{name}"),
service_id: Uuid::new_v4(),
data_stream: Box::pin(async { Ok(Bytes::from(bytes)) }.into_stream()),
data: bytes,
will_run_tests: false,
}
}
25 changes: 2 additions & 23 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
@@ -16,14 +16,11 @@ use std::fmt;
use std::fs::remove_file;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;

use bytes::{BufMut, Bytes};
use cargo::core::compiler::{CompileMode, MessageFormat};
use cargo::core::Workspace;
use cargo::ops::{CompileOptions, TestOptions};
use flate2::read::GzDecoder;
use futures::{Stream, StreamExt};
use tar::Archive;
use tokio::fs;

@@ -91,7 +88,7 @@ pub struct Queued {
pub id: Uuid,
pub service_name: String,
pub service_id: Uuid,
pub data_stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
pub data: Vec<u8>,
pub will_run_tests: bool,
}

@@ -104,16 +101,12 @@ impl Queued {
log_recorder: impl LogRecorder,
secret_recorder: impl SecretRecorder,
) -> Result<Built> {
info!("Fetching POSTed data");

let vec = extract_stream(self.data_stream).await?;

info!("Extracting received data");

let project_path = builds_path.join(&self.service_name);
fs::create_dir_all(project_path.clone()).await?;

extract_tar_gz_data(vec.as_slice(), &project_path)?;
extract_tar_gz_data(self.data.as_slice(), &project_path)?;

let secrets = get_secrets(&project_path).await?;
set_secrets(secrets, &self.service_id, secret_recorder).await?;
@@ -229,20 +222,6 @@ async fn set_secrets(
Ok(())
}

#[instrument(skip(data_stream))]
async fn extract_stream(
mut data_stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
) -> Result<Vec<u8>> {
let mut vec = Vec::new();
while let Some(buf) = data_stream.next().await {
let buf = buf?;
debug!("Received {} bytes", buf.len());
vec.put(buf);
}

Ok(vec)
}

/// Equivalent to the command: `tar -xzf --strip-components 1`
#[instrument(skip(data, dest))]
fn extract_tar_gz_data(data: impl Read, dest: impl AsRef<Path>) -> Result<()> {
2 changes: 0 additions & 2 deletions deployer/src/error.rs
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@ use cargo::util::errors::CargoTestError;

#[derive(Error, Debug)]
pub enum Error {
#[error("Streaming error: {0}")]
Streaming(#[source] axum::Error),
#[error("Internal I/O error: {0}")]
InputOutput(#[from] io::Error),
#[error("Build error: {0}")]
2 changes: 2 additions & 0 deletions deployer/src/handlers/error.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ use shuttle_common::models::error::ApiError;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Streaming error: {0}")]
Streaming(#[from] axum::Error),
#[error("Persistence failure: {0}")]
Persistence(#[from] crate::persistence::PersistenceError),
#[error("Failed to convert {from} to {to}")]
15 changes: 12 additions & 3 deletions deployer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -6,9 +6,10 @@ use axum::extract::{Extension, Path, Query};
use axum::http::{Request, Response};
use axum::routing::{get, Router};
use axum::{extract::BodyStream, Json};
use bytes::BufMut;
use chrono::{TimeZone, Utc};
use fqdn::FQDN;
use futures::TryStreamExt;
use futures::StreamExt;
use shuttle_common::models::secret;
use shuttle_common::LogItem;
use tower_http::auth::RequireAuthorizationLayer;
@@ -155,7 +156,7 @@ async fn post_service(
Extension(deployment_manager): Extension<DeploymentManager>,
Path((_project_name, service_name)): Path<(String, String)>,
Query(params): Query<HashMap<String, String>>,
stream: BodyStream,
mut stream: BodyStream,
) -> Result<Json<shuttle_common::models::deployment::Response>> {
let service = persistence.get_or_create_service(&service_name).await?;
let id = Uuid::new_v4();
@@ -168,13 +169,21 @@ async fn post_service(
address: None,
};

let mut data = Vec::new();
while let Some(buf) = stream.next().await {
let buf = buf?;
debug!("Received {} bytes", buf.len());
data.put(buf);
}
debug!("Received a total of {} bytes", data.len());

persistence.insert_deployment(deployment.clone()).await?;

let queued = Queued {
id,
service_name: service.name,
service_id: service.id,
data_stream: Box::pin(stream.map_err(crate::error::Error::Streaming)),
data,
will_run_tests: !params.contains_key("no-test"),
};

2 changes: 1 addition & 1 deletion gateway/src/project.rs
Original file line number Diff line number Diff line change
@@ -368,7 +368,7 @@ impl ProjectCreating {
"Healthcheck": {
"Interval": 1_000_000_000i64, // Every second
"Timeout": 15_000_000_000i64, // 15 seconds. Should match the --max-time below
"Test": ["CMD", "curl", "--max-time=15", format!("localhost:8001/projects/{project_name}/status")],
"Test": ["CMD", "curl", "--max-time", "15", format!("localhost:8001/projects/{project_name}/status")],
},
});

0 comments on commit b00671d

Please sign in to comment.