Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: transport error when trying to connect to provisioner #416

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,17 @@ mod tests {

struct StubAbstractProvisionerFactory;

#[async_trait::async_trait]
impl provisioner_factory::AbstractFactory for StubAbstractProvisionerFactory {
type Output = StubProvisionerFactory;
type Error = std::io::Error;

fn get_factory(
async fn get_factory(
&self,
_project_name: shuttle_common::project::ProjectName,
_service_id: Uuid,
) -> Self::Output {
StubProvisionerFactory
) -> Result<Self::Output, Self::Error> {
Ok(StubProvisionerFactory)
}
}

Expand Down
48 changes: 34 additions & 14 deletions deployer/src/deployment/provisioner_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,76 @@ use shuttle_proto::provisioner::{
database_request::DbType, provisioner_client::ProvisionerClient, DatabaseRequest,
};
use shuttle_service::{Factory, ServiceName};
use tonic::{transport::Channel, Request};
use thiserror::Error;
use tonic::{
transport::{Channel, Endpoint},
Request,
};
use tracing::{debug, info, trace};
use uuid::Uuid;

use crate::persistence::{Resource, ResourceRecorder, ResourceType, SecretGetter};

/// Trait to make it easy to get a factory (service locator) for each service being started
pub trait AbstractFactory: Send + 'static {
#[async_trait]
pub trait AbstractFactory: Send + Sync + 'static {
type Output: Factory;
type Error: std::error::Error;

/// Get a factory for a specific service
fn get_factory(&self, service_name: ServiceName, service_id: Uuid) -> Self::Output;
async fn get_factory(
&self,
service_name: ServiceName,
service_id: Uuid,
) -> Result<Self::Output, Self::Error>;
}

/// An abstract factory that makes factories which uses provisioner
#[derive(Clone)]
pub struct AbstractProvisionerFactory<R: ResourceRecorder, S: SecretGetter> {
provisioner_client: ProvisionerClient<Channel>,
provisioner_uri: Endpoint,
resource_recorder: R,
secret_getter: S,
}

#[async_trait]
impl<R: ResourceRecorder, S: SecretGetter> AbstractFactory for AbstractProvisionerFactory<R, S> {
type Output = ProvisionerFactory<R, S>;
type Error = ProvisionerError;

async fn get_factory(
&self,
service_name: ServiceName,
service_id: Uuid,
) -> Result<Self::Output, Self::Error> {
let provisioner_client = ProvisionerClient::connect(self.provisioner_uri.clone()).await?;

fn get_factory(&self, service_name: ServiceName, service_id: Uuid) -> Self::Output {
ProvisionerFactory::new(
self.provisioner_client.clone(),
Ok(ProvisionerFactory::new(
provisioner_client,
service_name,
service_id,
self.resource_recorder.clone(),
self.secret_getter.clone(),
)
))
}
}

impl<R: ResourceRecorder, S: SecretGetter> AbstractProvisionerFactory<R, S> {
pub fn new(
provisioner_client: ProvisionerClient<Channel>,
resource_recorder: R,
secret_getter: S,
) -> Self {
pub fn new(provisioner_uri: Endpoint, resource_recorder: R, secret_getter: S) -> Self {
Self {
provisioner_client,
provisioner_uri,
resource_recorder,
secret_getter,
}
}
}

#[derive(Error, Debug)]
pub enum ProvisionerError {
#[error("failed to connect to provisioner: {0}")]
TonicClient(#[from] tonic::transport::Error),
}

/// A factory (service locator) which goes through the provisioner crate
pub struct ProvisionerFactory<R: ResourceRecorder, S: SecretGetter> {
service_name: ServiceName,
Expand Down
11 changes: 10 additions & 1 deletion deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ pub async fn task(
continue;
}
};
let mut factory = abstract_factory.get_factory(service_name, built.service_id);
let mut factory = match abstract_factory
.get_factory(service_name, built.service_id)
.await
{
Ok(factory) => factory,
Err(err) => {
start_crashed_cleanup(&id, err);
continue;
}
};
let logger = logger_factory.get_logger(id);

let old_deployments_killer = kill_old_deployments(
Expand Down
12 changes: 2 additions & 10 deletions deployer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use shuttle_deployer::{
start, start_proxy, AbstractProvisionerFactory, Args, DeployLayer, Persistence,
RuntimeLoggerFactory,
};
use shuttle_proto::provisioner::provisioner_client::ProvisionerClient;
use tokio::select;
use tonic::transport::Endpoint;
use tracing::trace;
Expand Down Expand Up @@ -43,15 +42,8 @@ async fn main() {
))
.expect("provisioner uri is not valid");

let provisioner_client = ProvisionerClient::connect(provisioner_uri)
.await
.expect("failed to connect to provisioner");

let abstract_factory = AbstractProvisionerFactory::new(
provisioner_client,
persistence.clone(),
persistence.clone(),
);
let abstract_factory =
AbstractProvisionerFactory::new(provisioner_uri, persistence.clone(), persistence.clone());

let runtime_logger_factory = RuntimeLoggerFactory::new(persistence.get_log_sender());

Expand Down
3 changes: 2 additions & 1 deletion provisioner/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, time::Duration};

use clap::Parser;
use shuttle_provisioner::{Args, MyProvisioner, ProvisionerServer};
Expand Down Expand Up @@ -31,6 +31,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("starting provisioner on {}", addr);
Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(30))) // Prevent deployer clients from loosing connection #ENG-219
.add_service(ProvisionerServer::new(provisioner))
.serve(addr)
.await?;
Expand Down