Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] Add concurrency limit to GRPC #627

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions deployment-examples/docker-compose/scheduler.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
}
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/docker-compose/worker.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
},
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/kubernetes/scheduler.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
}
Expand Down
8 changes: 6 additions & 2 deletions deployment-examples/kubernetes/worker.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "cas"
}
},
"GRPC_LOCAL_AC_STORE": {
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"],
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"store_type": "ac"
}
},
Expand Down
12 changes: 4 additions & 8 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::collections::HashMap;

use serde::Deserialize;

use crate::serde_utils::{convert_numeric_with_shellexpand, convert_string_with_shellexpand};
use crate::stores::{ClientTlsConfig, Retry, StoreRefName};
use crate::serde_utils::convert_numeric_with_shellexpand;
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};

#[allow(non_camel_case_types)]
#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -125,19 +125,15 @@ pub struct SimpleScheduler {
/// is useful to use when doing some kind of local action cache or CAS away from
/// the main cluster of workers. In general, it's more efficient to point the
/// build at the main scheduler directly though.
#[derive(Deserialize, Debug, Default)]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GrpcScheduler {
/// The upstream scheduler to forward requests to.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub endpoint: String,
pub endpoint: GrpcEndpoint,

/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// The TLS configuration to use to connect to the endpoint.
pub tls_config: Option<ClientTlsConfig>,
}

#[derive(Deserialize, Debug)]
Expand Down
18 changes: 13 additions & 5 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,18 @@ pub struct ClientTlsConfig {
pub key_file: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct GrpcEndpoint {
/// The endpoint address (i.e. grpc(s)://example.com:443).
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub address: String,
/// The TLS configuration to use to connect to the endpoint (if grpcs).
pub tls_config: Option<ClientTlsConfig>,
/// The maximum concurrency to allow on this endpoint.
pub concurrency_limit: Option<usize>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct GrpcStore {
Expand All @@ -526,18 +538,14 @@ pub struct GrpcStore {
pub instance_name: String,

/// The endpoint of the grpc connection.
#[serde(default)]
pub endpoints: Vec<String>,
pub endpoints: Vec<GrpcEndpoint>,

/// The type of the upstream store, this ensures that the correct server calls are made.
pub store_type: StoreType,

/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// The TLS configuration to use to connect to the endpoints.
pub tls_config: Option<ClientTlsConfig>,
}

/// Retry configuration. This configuration is exponential and each iteration
Expand Down
5 changes: 1 addition & 4 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ impl GrpcScheduler {
config: &nativelink_config::schedulers::GrpcScheduler,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
let channel = transport::Channel::balance_list(std::iter::once(tls_utils::endpoint_from(
&config.endpoint,
tls_utils::load_client_config(&config.tls_config)?,
)?));
let channel = transport::Channel::balance_list(std::iter::once(tls_utils::endpoint(&config.endpoint)?));
Ok(Self {
capabilities_client: CapabilitiesClient::new(channel.clone()),
execution_client: ExecutionClient::new(channel),
Expand Down
13 changes: 2 additions & 11 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,10 @@ impl GrpcStore {
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
error_if!(config.endpoints.is_empty(), "Expected at least 1 endpoint in GrpcStore");
let tls_config = tls_utils::load_client_config(&config.tls_config)?;
let mut endpoints = Vec::with_capacity(config.endpoints.len());
for endpoint in &config.endpoints {
// TODO(allada) This should be moved to be done in utils/serde_utils.rs like the others.
// We currently don't have a way to handle those helpers with vectors.
let endpoint = shellexpand::env(&endpoint)
.map_err(|e| make_input_err!("{e}"))
.err_tip(|| "Could not expand endpoint in GrpcStore")?
.to_string();

let endpoint = tls_utils::endpoint_from(&endpoint, tls_config.clone())
for endpoint_config in &config.endpoints {
let endpoint = tls_utils::endpoint(endpoint_config)
.map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?;

endpoints.push(endpoint);
}

Expand Down
14 changes: 13 additions & 1 deletion nativelink-util/src/tls_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_config::stores::ClientTlsConfig;
use nativelink_config::stores::{ClientTlsConfig, GrpcEndpoint};
use nativelink_error::{make_err, make_input_err, Code, Error};
use tonic::transport::Uri;

Expand Down Expand Up @@ -86,3 +86,15 @@ pub fn endpoint_from(

Ok(endpoint_transport)
}

pub fn endpoint(endpoint_config: &GrpcEndpoint) -> Result<tonic::transport::Endpoint, Error> {
let endpoint = endpoint_from(
&endpoint_config.address,
load_client_config(&endpoint_config.tls_config)?,
)?;
if let Some(concurrency_limit) = endpoint_config.concurrency_limit {
Ok(endpoint.concurrency_limit(concurrency_limit))
} else {
Ok(endpoint)
}
}