Skip to content

Commit

Permalink
[BREAKING] Add concurrency limit to GRPC (#627)
Browse files Browse the repository at this point in the history
Currently we apply no concurrency limits for GRPC calls to GrpcStore
or GrpcScheduler.  This can cause issues for a heavily loaded upstream
service that will start to build up reset streams and break.

This change modifies the configuration to have configuration per endpoint
and adds an optional configuration to allow for setting a limit to the
upstream concurrency.
  • Loading branch information
chrisstaite-menlo authored Jan 26, 2024
1 parent 39c6678 commit b47f39b
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 37 deletions.
8 changes: 6 additions & 2 deletions deployment-examples/docker-compose/scheduler.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
}
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/docker-compose/worker.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
},
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/kubernetes/scheduler.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
}
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/kubernetes/worker.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
},
Expand Down
12 changes: 4 additions & 8 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::collections::HashMap;

use serde::Deserialize;

use crate::serde_utils::{convert_numeric_with_shellexpand, convert_string_with_shellexpand};
use crate::stores::{ClientTlsConfig, Retry, StoreRefName};
use crate::serde_utils::convert_numeric_with_shellexpand;
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -125,19 +125,15 @@ pub struct SimpleScheduler {
/// is useful to use when doing some kind of local action cache or CAS away from
/// the main cluster of workers. In general, it's more efficient to point the
/// build at the main scheduler directly though.
#[derive(Deserialize, Debug, Default)]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GrpcScheduler {
/// The upstream scheduler to forward requests to.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub endpoint: String,
pub endpoint: GrpcEndpoint,

/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// The TLS configuration to use to connect to the endpoint.
pub tls_config: Option<ClientTlsConfig>,
}

#[derive(Deserialize, Debug)]
Expand Down
18 changes: 13 additions & 5 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,18 @@ pub struct ClientTlsConfig {
pub key_file: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct GrpcEndpoint {
/// The endpoint address (i.e. grpc(s)://example.com:443).
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub address: String,
/// The TLS configuration to use to connect to the endpoint (if grpcs).
pub tls_config: Option<ClientTlsConfig>,
/// The maximum concurrency to allow on this endpoint.
pub concurrency_limit: Option<usize>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct GrpcStore {
Expand All @@ -526,18 +538,14 @@ pub struct GrpcStore {
pub instance_name: String,

/// The endpoint of the grpc connection.
#[serde(default)]
pub endpoints: Vec<String>,
pub endpoints: Vec<GrpcEndpoint>,

/// The type of the upstream store, this ensures that the correct server calls are made.
pub store_type: StoreType,

/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// The TLS configuration to use to connect to the endpoints.
pub tls_config: Option<ClientTlsConfig>,
}

/// Retry configuration. This configuration is exponential and each iteration
Expand Down
5 changes: 1 addition & 4 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ impl GrpcScheduler {
config: &nativelink_config::schedulers::GrpcScheduler,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
let channel = transport::Channel::balance_list(std::iter::once(tls_utils::endpoint_from(
&config.endpoint,
tls_utils::load_client_config(&config.tls_config)?,
)?));
let channel = transport::Channel::balance_list(std::iter::once(tls_utils::endpoint(&config.endpoint)?));
Ok(Self {
capabilities_client: CapabilitiesClient::new(channel.clone()),
execution_client: ExecutionClient::new(channel),
Expand Down
13 changes: 2 additions & 11 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,10 @@ impl GrpcStore {
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
error_if!(config.endpoints.is_empty(), "Expected at least 1 endpoint in GrpcStore");
let tls_config = tls_utils::load_client_config(&config.tls_config)?;
let mut endpoints = Vec::with_capacity(config.endpoints.len());
for endpoint in &config.endpoints {
// TODO(allada) This should be moved to be done in utils/serde_utils.rs like the others.
// We currently don't have a way to handle those helpers with vectors.
let endpoint = shellexpand::env(&endpoint)
.map_err(|e| make_input_err!("{e}"))
.err_tip(|| "Could not expand endpoint in GrpcStore")?
.to_string();

let endpoint = tls_utils::endpoint_from(&endpoint, tls_config.clone())
for endpoint_config in &config.endpoints {
let endpoint = tls_utils::endpoint(endpoint_config)
.map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;

endpoints.push(endpoint);
}

Expand Down
14 changes: 13 additions & 1 deletion nativelink-util/src/tls_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_config::stores::ClientTlsConfig;
use nativelink_config::stores::{ClientTlsConfig, GrpcEndpoint};
use nativelink_error::{make_err, make_input_err, Code, Error};
use tonic::transport::Uri;

Expand Down Expand Up @@ -86,3 +86,15 @@ pub fn endpoint_from(

Ok(endpoint_transport)
}

pub fn endpoint(endpoint_config: &GrpcEndpoint) -> Result<tonic::transport::Endpoint, Error> {
let endpoint = endpoint_from(
&endpoint_config.address,
load_client_config(&endpoint_config.tls_config)?,
)?;
if let Some(concurrency_limit) = endpoint_config.concurrency_limit {
Ok(endpoint.concurrency_limit(concurrency_limit))
} else {
Ok(endpoint)
}
}

0 comments on commit b47f39b

Please sign in to comment.