Skip to content

Commit

Permalink
fix(clients): use the --client-timeout flag as the period we allow …
Browse files Browse the repository at this point in the history
…retries with exponential backoff + jitter (#2019)

- we were _always_ defaulting our retries to 10s; that might have made
sense in the synchronous world, but in the coming asynchronous world we
can use the full time the client is alive for retries
- I don't love this approach with its passing-of-args-everywhere, but I
made a ticket for a more thorough refactor rather than start going down
that route now
- ~~**still adding tests**~~ I can't think of any more valuable tests to
add, but definitely open to hearing what others might find useful here
  • Loading branch information
aaronArinder authored Aug 5, 2024
1 parent 48183b3 commit 3b74d9f
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 51 deletions.
37 changes: 28 additions & 9 deletions crates/rover-client/src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@ use crate::error::{EndpointKind, RoverClientError};

pub(crate) const JSON_CONTENT_TYPE: &str = "application/json";

const MAX_ELAPSED_TIME: Option<Duration> =
Some(Duration::from_secs(if cfg!(test) { 2 } else { 10 }));

/// Represents a generic GraphQL client for making http requests.
pub struct GraphQLClient {
graphql_endpoint: String,
client: ReqwestClient,
retry_period: Option<Duration>,
}

impl GraphQLClient {
/// Construct a new [Client] from a `graphql_endpoint`.
/// This client is used for generic GraphQL requests, such as introspection.
pub fn new(graphql_endpoint: &str, client: ReqwestClient) -> GraphQLClient {
pub fn new(
graphql_endpoint: &str,
client: ReqwestClient,
retry_period: Option<Duration>,
) -> GraphQLClient {
GraphQLClient {
graphql_endpoint: graphql_endpoint.to_string(),
client,
retry_period,
}
}

Expand Down Expand Up @@ -151,7 +154,7 @@ impl GraphQLClient {

if should_retry {
let backoff_strategy = ExponentialBackoff {
max_elapsed_time: MAX_ELAPSED_TIME,
max_elapsed_time: self.retry_period,
..Default::default()
};

Expand Down Expand Up @@ -323,7 +326,11 @@ mod tests {
});

let client = ReqwestClient::new();
let graphql_client = GraphQLClient::new(&server.url(success_path), client);
let graphql_client = GraphQLClient::new(
&server.url(success_path),
client,
Some(Duration::from_secs(3)),
);

let response = graphql_client.execute(
"{}".to_string(),
Expand All @@ -348,7 +355,11 @@ mod tests {
});

let client = ReqwestClient::new();
let graphql_client = GraphQLClient::new(&server.url(internal_server_error_path), client);
let graphql_client = GraphQLClient::new(
&server.url(internal_server_error_path),
client,
Some(Duration::from_secs(3)),
);

let response = graphql_client.execute(
"{}".to_string(),
Expand All @@ -373,7 +384,11 @@ mod tests {
});

let client = ReqwestClient::new();
let graphql_client = GraphQLClient::new(&server.url(not_found_path), client);
let graphql_client = GraphQLClient::new(
&server.url(not_found_path),
client,
Some(Duration::from_secs(3)),
);

let response = graphql_client.execute(
"{}".to_string(),
Expand Down Expand Up @@ -405,7 +420,11 @@ mod tests {
.timeout(Duration::from_secs(1))
.build()
.unwrap();
let graphql_client = GraphQLClient::new(&server.url(timeout_path), client);
let graphql_client = GraphQLClient::new(
&server.url(timeout_path),
client,
Some(Duration::from_secs(3)),
);

let response = graphql_client.execute(
"{}".to_string(),
Expand Down
4 changes: 3 additions & 1 deletion crates/rover-client/src/blocking/studio_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
};

use houston::{Credential, CredentialOrigin};
use std::time::Duration;

use graphql_client::GraphQLQuery;
use reqwest::blocking::Client as ReqwestClient;
Expand All @@ -27,10 +28,11 @@ impl StudioClient {
version: &str,
is_sudo: bool,
client: ReqwestClient,
retry_period: Option<Duration>,
) -> StudioClient {
StudioClient {
credential,
client: GraphQLClient::new(graphql_endpoint, client),
client: GraphQLClient::new(graphql_endpoint, client, retry_period),
version: version.to_string(),
is_sudo,
}
Expand Down
1 change: 1 addition & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl Rover {
config,
is_sudo,
self.get_reqwest_client_builder(),
Some(self.client_timeout.get_duration()),
))
}

Expand Down
4 changes: 2 additions & 2 deletions src/command/dev/do_dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Dev {
subgraph_watchers.into_iter().for_each(|mut watcher| {
std::thread::spawn(move || {
let _ = watcher
.watch_subgraph_for_changes()
.watch_subgraph_for_changes(client_config.retry_period)
.map_err(log_err_and_continue);
});
});
Expand Down Expand Up @@ -166,7 +166,7 @@ impl Dev {

// watch for subgraph changes on the main thread
// it will take care of updating the main `rover dev` session
subgraph_refresher.watch_subgraph_for_changes()?;
subgraph_refresher.watch_subgraph_for_changes(client_config.retry_period)?;
}

unreachable!("watch_subgraph_for_changes never returns")
Expand Down
15 changes: 12 additions & 3 deletions src/command/dev/introspect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use anyhow::anyhow;
use reqwest::blocking::Client;

Expand Down Expand Up @@ -29,17 +31,22 @@ impl UnknownIntrospectRunner {
}
}

pub fn run(&self) -> RoverResult<(SubgraphSdl, IntrospectRunnerKind)> {
pub fn run(
&self,
retry_period: Option<Duration>,
) -> RoverResult<(SubgraphSdl, IntrospectRunnerKind)> {
let subgraph_runner = SubgraphIntrospectRunner {
endpoint: self.endpoint.clone(),
client: self.client.clone(),
headers: self.headers.clone(),
retry_period,
};

let graph_runner = GraphIntrospectRunner {
endpoint: self.endpoint.clone(),
client: self.client.clone(),
headers: self.headers.clone(),
retry_period,
};

// we _could_ run these in parallel
Expand Down Expand Up @@ -102,6 +109,7 @@ pub struct SubgraphIntrospectRunner {
endpoint: SubgraphUrl,
client: Client,
headers: Option<Vec<(String, String)>>,
retry_period: Option<Duration>,
}

impl SubgraphIntrospectRunner {
Expand All @@ -117,7 +125,7 @@ impl SubgraphIntrospectRunner {
watch: false,
},
}
.exec(&self.client, true)
.exec(&self.client, true, self.retry_period)
}
}

Expand All @@ -126,6 +134,7 @@ pub struct GraphIntrospectRunner {
endpoint: SubgraphUrl,
client: Client,
headers: Option<Vec<(String, String)>>,
retry_period: Option<Duration>,
}

impl GraphIntrospectRunner {
Expand All @@ -141,6 +150,6 @@ impl GraphIntrospectRunner {
watch: false,
},
}
.exec(&self.client, true)
.exec(&self.client, true, self.retry_period)
}
}
1 change: 1 addition & 0 deletions src/command/dev/router/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ mod tests {
houston::Config::new(None::<&Utf8PathBuf>, None).unwrap(),
false,
ClientBuilder::new(),
Some(Duration::from_secs(3)),
),
);

Expand Down
28 changes: 19 additions & 9 deletions src/command/dev/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::{collections::HashMap, time::Duration};

use anyhow::{anyhow, Context};
use apollo_federation_types::build::SubgraphDefinition;
Expand Down Expand Up @@ -161,6 +161,7 @@ impl SubgraphSchemaWatcher {

pub fn get_subgraph_definition_and_maybe_new_runner(
&self,
retry_period: Option<Duration>,
) -> RoverResult<(SubgraphDefinition, Option<SubgraphSchemaWatcherKind>)> {
let (name, url) = self.subgraph_key.clone();
let (sdl, refresher) = match &self.schema_watcher_kind {
Expand All @@ -175,7 +176,7 @@ impl SubgraphSchemaWatcher {
(sdl, None)
}
IntrospectRunnerKind::Unknown(unknown_runner) => {
let (sdl, specific_runner) = unknown_runner.run()?;
let (sdl, specific_runner) = unknown_runner.run(retry_period)?;
(
sdl,
Some(SubgraphSchemaWatcherKind::Introspect(
Expand All @@ -198,8 +199,14 @@ impl SubgraphSchemaWatcher {
Ok((subgraph_definition, refresher))
}

fn update_subgraph(&mut self, last_message: Option<&String>) -> RoverResult<Option<String>> {
let maybe_update_message = match self.get_subgraph_definition_and_maybe_new_runner() {
fn update_subgraph(
&mut self,
last_message: Option<&String>,
retry_period: Option<Duration>,
) -> RoverResult<Option<String>> {
let maybe_update_message = match self
.get_subgraph_definition_and_maybe_new_runner(retry_period)
{
Ok((subgraph_definition, maybe_new_refresher)) => {
if let Some(new_refresher) = maybe_new_refresher {
self.set_schema_refresher(new_refresher);
Expand Down Expand Up @@ -260,7 +267,10 @@ impl SubgraphSchemaWatcher {
///
/// This function will block forever for `SubgraphSchemaWatcherKind` that poll for changes—so it
/// should be started in a separate thread.
pub fn watch_subgraph_for_changes(&mut self) -> RoverResult<()> {
pub fn watch_subgraph_for_changes(
&mut self,
retry_period: Option<Duration>,
) -> RoverResult<()> {
let mut last_message = None;
match self.schema_watcher_kind.clone() {
SubgraphSchemaWatcherKind::Introspect(introspect_runner_kind, polling_interval) => {
Expand All @@ -276,13 +286,13 @@ impl SubgraphSchemaWatcher {
}
);
loop {
last_message = self.update_subgraph(last_message.as_ref())?;
last_message = self.update_subgraph(last_message.as_ref(), retry_period)?;
std::thread::sleep(std::time::Duration::from_secs(polling_interval));
}
}
SubgraphSchemaWatcherKind::File(path) => {
// populate the schema for the first time (last_message is always None to start)
last_message = self.update_subgraph(last_message.as_ref())?;
last_message = self.update_subgraph(last_message.as_ref(), retry_period)?;

let (tx, rx) = unbounded();

Expand All @@ -296,11 +306,11 @@ impl SubgraphSchemaWatcher {
Ok(Err(err)) => return Err(anyhow::Error::from(err).into()),
Err(err) => return Err(anyhow::Error::from(err).into()),
}
last_message = self.update_subgraph(last_message.as_ref())?;
last_message = self.update_subgraph(last_message.as_ref(), retry_period)?;
}
}
SubgraphSchemaWatcherKind::Once(_) => {
self.update_subgraph(None)?;
self.update_subgraph(None, retry_period)?;
}
}
Ok(())
Expand Down
31 changes: 23 additions & 8 deletions src/command/graph/introspect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::Parser;
use reqwest::blocking::Client;
use serde::Serialize;
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use rover_client::{
blocking::GraphQLClient,
Expand All @@ -20,17 +20,27 @@ pub struct Introspect {
}

impl Introspect {
pub fn run(&self, client: Client, output_opts: &OutputOpts) -> RoverResult<RoverOutput> {
pub fn run(
&self,
client: Client,
output_opts: &OutputOpts,
retry_period: Option<Duration>,
) -> RoverResult<RoverOutput> {
if self.opts.watch {
self.exec_and_watch(&client, output_opts)
self.exec_and_watch(&client, output_opts, retry_period)
} else {
let sdl = self.exec(&client, true)?;
let sdl = self.exec(&client, true, retry_period)?;
Ok(RoverOutput::Introspection(sdl))
}
}

pub fn exec(&self, client: &Client, should_retry: bool) -> RoverResult<String> {
let client = GraphQLClient::new(self.opts.endpoint.as_ref(), client.clone());
pub fn exec(
&self,
client: &Client,
should_retry: bool,
retry_period: Option<Duration>,
) -> RoverResult<String> {
let client = GraphQLClient::new(self.opts.endpoint.as_ref(), client.clone(), retry_period);

// add the flag headers to a hashmap to pass along to rover-client
let mut headers = HashMap::new();
Expand All @@ -43,8 +53,13 @@ impl Introspect {
Ok(introspect::run(GraphIntrospectInput { headers }, &client, should_retry)?.schema_sdl)
}

pub fn exec_and_watch(&self, client: &Client, output_opts: &OutputOpts) -> ! {
pub fn exec_and_watch(
&self,
client: &Client,
output_opts: &OutputOpts,
retry_period: Option<Duration>,
) -> ! {
self.opts
.exec_and_watch(|| self.exec(client, false), output_opts)
.exec_and_watch(|| self.exec(client, false, retry_period), output_opts)
}
}
8 changes: 5 additions & 3 deletions src/command/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ impl Graph {
Command::Fetch(command) => command.run(client_config),
Command::Lint(command) => command.run(client_config),
Command::Publish(command) => command.run(client_config, git_context),
Command::Introspect(command) => {
command.run(client_config.get_reqwest_client()?, output_opts)
}
Command::Introspect(command) => command.run(
client_config.get_reqwest_client()?,
output_opts,
client_config.retry_period,
),
}
}
}
Loading

0 comments on commit 3b74d9f

Please sign in to comment.