Skip to content

Commit

Permalink
feat(config): support monitoring external config
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>

don't delete non-managed config

Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Apr 16, 2024
1 parent 6a35dd0 commit 90c3eeb
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 49 deletions.
105 changes: 76 additions & 29 deletions src/scaler/configscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct ConfigScaler<ConfigSource> {
// NOTE(#263): Introducing storing the entire configuration in-memory has the potential to get
// fairly heavy if the configuration is large. We should consider a more efficient way to store
// this by fetching configuration from the manifest when it's needed, for example.
config: HashMap<String, String>,
config: Option<HashMap<String, String>>,
status: RwLock<StatusInfo>,
}

Expand Down Expand Up @@ -72,20 +72,38 @@ impl<C: ConfigSource + Send + Sync + Clone> Scaler for ConfigScaler<C> {
#[instrument(level = "trace", skip_all, scaler_id = %self.id)]
async fn reconcile(&self) -> Result<Vec<Command>> {
debug!(self.config_name, "Fetching configuration");
match self.config_bucket.get_config(&self.config_name).await {
Ok(Some(config)) if config == self.config => {
match (
self.config_bucket.get_config(&self.config_name).await,
self.config.as_ref(),
) {
// If configuration is not supplied to the scaler, we just ensure that it exists
(Ok(Some(_config)), None) => {
*self.status.write().await = StatusInfo::deployed("");
Ok(Vec::new())
}
Ok(_config) => {
// If configuration is not supplied and doesn't exist, we enter a failed state
(Ok(None), None) => {
*self.status.write().await = StatusInfo::failed(&format!(
"Specified configuration {} does not exist",
self.config_name
));
Ok(Vec::new())
}
// If configuration matches what's supplied, this scaler is deployed
(Ok(Some(config)), Some(scaler_config)) if &config == scaler_config => {
*self.status.write().await = StatusInfo::deployed("");
Ok(Vec::new())
}
// If configuration is out of sync, we put the configuration
(Ok(_config), Some(scaler_config)) => {
debug!(self.config_name, "Putting configuration");
*self.status.write().await = StatusInfo::reconciling("Configuration out of sync");
Ok(vec![Command::PutConfig(PutConfig {
config_name: self.config_name.clone(),
config: self.config.clone(),
config: scaler_config.clone(),
})])
}
Err(e) => {
(Err(e), _) => {
error!(error = %e, "Configscaler failed to fetch configuration");
*self.status.write().await = StatusInfo::failed(&e.to_string());
Ok(Vec::new())
Expand All @@ -95,25 +113,37 @@ impl<C: ConfigSource + Send + Sync + Clone> Scaler for ConfigScaler<C> {

#[instrument(level = "trace", skip_all)]
async fn cleanup(&self) -> Result<Vec<Command>> {
Ok(vec![Command::DeleteConfig(DeleteConfig {
config_name: self.config_name.clone(),
})])
if self.config.is_some() {
Ok(vec![Command::DeleteConfig(DeleteConfig {
config_name: self.config_name.clone(),
})])
} else {
// This configuration is externally managed, don't delete it
Ok(Vec::new())
}
}
}

impl<C: ConfigSource> ConfigScaler<C> {
/// Construct a new ConfigScaler with specified values
pub fn new(config_bucket: C, config_name: &str, config: &HashMap<String, String>) -> Self {
pub fn new(
config_bucket: C,
config_name: &str,
config: Option<&HashMap<String, String>>,
) -> Self {
let mut id = config_name.to_string();
// Hash the config to generate a unique id, used to compare scalers for uniqueness when updating
let mut config_hasher = std::collections::hash_map::DefaultHasher::new();
BTreeMap::from_iter(config.iter()).hash(&mut config_hasher);
let id = format!("{config_name}-{}", config_hasher.finish());
if let Some(config) = config.as_ref() {
let mut config_hasher = std::collections::hash_map::DefaultHasher::new();
BTreeMap::from_iter(config.iter()).hash(&mut config_hasher);
id.extend(format!("-{}", config_hasher.finish()).chars());
}

Self {
config_bucket,
id,
config_name: config_name.to_string(),
config: config.clone(),
config: config.cloned(),
status: RwLock::new(StatusInfo::reconciling("")),
}
}
Expand Down Expand Up @@ -150,11 +180,8 @@ mod test {
)])),
};

let config_scaler = ConfigScaler::new(
lattice,
&config.name,
&config.properties.clone().expect("properties not found"),
);
let config_scaler =
ConfigScaler::new(lattice.clone(), &config.name, config.properties.as_ref());

assert_eq!(
config_scaler.status().await.status_type,
Expand Down Expand Up @@ -262,11 +289,7 @@ mod test {
)]),
};

let config_scaler2 = ConfigScaler::new(
lattice2,
&config.name,
&config.properties.clone().expect("properties not found"),
);
let config_scaler2 = ConfigScaler::new(lattice2, &config.name, config.properties.as_ref());

assert_eq!(
config_scaler2
Expand Down Expand Up @@ -312,11 +335,8 @@ mod test {
HashMap::from_iter(vec![("key".to_string(), "wrong_value".to_string())]),
)]),
};
let config_scaler3 = ConfigScaler::new(
lattice3,
&config.name,
&config.properties.clone().expect("properties not found"),
);
let config_scaler3 =
ConfigScaler::new(lattice3.clone(), &config.name, config.properties.as_ref());

assert_eq!(
config_scaler3
Expand All @@ -332,5 +352,32 @@ mod test {
config_scaler3.status().await.status_type,
crate::server::StatusType::Reconciling
);

// Test supplied name but not supplied config
let config_scaler4 = ConfigScaler::new(lattice3, &config.name, None);
assert_eq!(
config_scaler4
.reconcile()
.await
.expect("reconcile should succeed"),
vec![]
);
assert_eq!(
config_scaler4.status().await.status_type,
crate::server::StatusType::Deployed
);

let config_scaler5 = ConfigScaler::new(lattice, &config.name, None);
assert_eq!(
config_scaler5
.reconcile()
.await
.expect("reconcile should succeed"),
vec![]
);
assert_eq!(
config_scaler5.status().await.status_type,
crate::server::StatusType::Failed
);
}
}
28 changes: 10 additions & 18 deletions src/scaler/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,28 +842,20 @@ fn config_to_scalers<C: ConfigSource + Send + Sync + Clone>(
model_name: &str,
configs: &[ConfigProperty],
) -> (Vec<ConfigScaler<C>>, Vec<String>) {
let (config_scalers, names): (Vec<Option<ConfigScaler<C>>>, Vec<String>) = configs
configs
.iter()
.map(|config| {
// We're mapping the component properties here in a `filter_map` as we're only interested
// in a [ConfigScaler] if the [ConfigProperty] has a `name` and `properties` field.
// If the [ConfigProperty] only lists a name, it's managed externally.
config.properties.as_ref().map_or_else(
|| (None, config.name.clone()),
|properties| {
// NOTE: Here we are explicitly taking
let name = compute_component_id(model_name, None, &config.name);
(
Some(ConfigScaler::new(config_source.clone(), &name, properties)),
name,
)
},
let name = if config.properties.is_some() {
compute_component_id(model_name, None, &config.name)
} else {
config.name.clone()
};
(
ConfigScaler::new(config_source.clone(), &name, config.properties.as_ref()),
name,
)
})
.unzip();

// Filter out None values from the scalers, keeping all names
(config_scalers.into_iter().flatten().collect(), names)
.unzip()
}

/// Based on the name of the model and the optionally provided ID, returns a unique ID for the
Expand Down
2 changes: 0 additions & 2 deletions src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,6 @@ impl<P: Publisher> Handler<P> {
}
}

// TODO(#253): Validate that named configurations managed outside of wadm exist?

if !manifests.deploy(req.version) {
trace!("Requested version does not exist");
self.send_reply(
Expand Down
1 change: 1 addition & 0 deletions test/data/complex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spec:
- name: defaultcode
properties:
http: "404"
- name: blobby-default-configuration-values
traits:
- type: spreadscaler
properties:
Expand Down
10 changes: 10 additions & 0 deletions tests/e2e_multiple_hosts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ async fn test_complex_app(client_info: &ClientInfo) {
"Shouldn't have errored when creating manifest: {resp:?}"
);

// Put configuration that's mentioned in manifest but without properties
client_info
.ctl_client("default")
.put_config(
"blobby-default-configuration-values",
HashMap::from_iter([("littleblobby".to_string(), "tables".to_string())]),
)
.await
.expect("should be able to put blobby config");

// Deploy manifest
let resp = client_info
.deploy_manifest("complex", None, None, None)
Expand Down

0 comments on commit 90c3eeb

Please sign in to comment.