Skip to content

Commit

Permalink
feat: Missing messages emission for rules and minor message refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lfbrehm committed May 7, 2024
1 parent 3909bbd commit 57e616c
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 49 deletions.
76 changes: 42 additions & 34 deletions components/server/src/caching/notifications_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use time::OffsetDateTime;

use crate::database::dsls::pub_key_dsl::PubKey;
use crate::database::dsls::rule_dsl::{Rule, RuleBinding};
use crate::notification::natsio_handler::{Action, CacheUpdate, ServerEvents, UpdateResource};
use crate::notification::natsio_handler::{Action, Created, Deleted, ServerEvents, Updated};
use crate::notification::utils::build_rule;
use crate::search::meilisearch_client::{MeilisearchClient, ObjectDocument};
use crate::utils::search_utils;
Expand Down Expand Up @@ -371,39 +371,47 @@ async fn process_server_event(
)
}
}
ServerEvents::CACHEUPDATE(CacheUpdate { resource, action }) => match action {
Action::Created | Action::Updated => match resource {
UpdateResource::Rule(id) => {
let client = db_handler.get_client().await?;
let rule = Rule::get(id, &client)
.await?
.ok_or_else(|| anyhow!("Rule not found"))?;
let cached = build_rule(rule)?;
cache.insert_rule(&id, cached);
}
UpdateResource::RuleBinding { binding: id, .. } => {
let client = db_handler.get_client().await?;
let binding = RuleBinding::get(id, &client)
.await?
.ok_or_else(|| anyhow!("Rule not found"))?;
let resource_ids = if binding.cascading {
let mut ids = vec![binding.origin_id];
ids.append(&mut cache.get_subresources(&binding.origin_id)?);
ids
} else {
vec![binding.origin_id]
};
cache.insert_rule_binding(resource_ids, binding);
}
},
Action::Deleted => match resource {
UpdateResource::Rule(id) => {
cache.delete_rule(&id);
}
UpdateResource::RuleBinding { resource, rule, .. } => {
cache.remove_rule_bindings(resource, rule);
}
},
ServerEvents::CACHEUPDATE(action) => match action {
Action::Created(Created::Rule(id)) | Action::Updated(Updated::Rule(id)) => {
let client = db_handler.get_client().await?;
let rule = Rule::get(id, &client)
.await?
.ok_or_else(|| anyhow!("Rule not found"))?;
let cached = build_rule(rule)?;
cache.insert_rule(&id, cached);
}
Action::Created(Created::RuleBinding {
rule_id,
origin_id,
resource_id,
})
| Action::Updated(Updated::RuleBinding {
rule_id,
origin_id,
resource_id,
}) => {
let client = db_handler.get_client().await?;
let binding = RuleBinding::get_by(rule_id, origin_id, resource_id, &client)
.await?
.ok_or_else(|| anyhow!("Rule not found"))?;
let resource_ids = if binding.cascading {
let mut ids = vec![binding.origin_id];
ids.append(&mut cache.get_subresources(&binding.origin_id)?);
ids
} else {
vec![binding.origin_id]
};
cache.insert_rule_binding(resource_ids, binding);
}
Action::Deleted(Deleted::Rule(id)) => {
cache.delete_rule(&id);
}
Action::Deleted(Deleted::RuleBinding {
resource_id,
rule_id,
}) => {
cache.remove_rule_bindings(resource_id, rule_id);
}
},
}

Expand Down
14 changes: 14 additions & 0 deletions components/server/src/database/dsls/rule_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,18 @@ impl RuleBinding {
client.execute(&prepared, &[&rule_id, &origin_id]).await?;
Ok(())
}

pub async fn get_by(
rule_id: DieselUlid,
origin_id: DieselUlid,
resource_id: DieselUlid,
client: &Client,
) -> Result<Option<RuleBinding>> {
let query = "SELECT * FROM rule_bindings WHERE rule_id = $1 AND origin_id = $2 AND resource_id = $3;";
let prepared = client.prepare(query).await?;
Ok(client
.query_opt(&prepared, &[&rule_id, &origin_id, &resource_id])
.await?
.map(|e| RuleBinding::from_row(&e)))
}
}
78 changes: 77 additions & 1 deletion components/server/src/middlelayer/rule_db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::caching::structs::CachedRule;
use crate::database::dsls::rule_dsl::RuleBinding;
use crate::database::{crud::CrudDb, dsls::object_dsl::Object};
use crate::middlelayer::rule_request_types::{CreateRuleBinding, DeleteRuleBinding, UpdateRule};
use crate::notification::natsio_handler::{Action, Created, Deleted, ServerEvents, Updated};
use ahash::HashSet;
use anyhow::{anyhow, Ok, Result};
use cel_interpreter::Value;
Expand Down Expand Up @@ -54,6 +55,22 @@ impl DatabaseHandler {
}
}
}
for binding in inherited_rules {
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Updated(
Updated::RuleBinding {
rule_id: binding.rule_id,
origin_id: binding.origin_id,
resource_id: binding.object_id,
},
)))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
}
Ok(())
}

Expand Down Expand Up @@ -85,6 +102,16 @@ impl DatabaseHandler {
let id = rule.rule.id;
rule.rule.create(&client).await?;
self.cache.insert_rule(&id, rule.clone());
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Created(Created::Rule(
id,
))))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
Ok(id)
}

Expand All @@ -97,13 +124,34 @@ impl DatabaseHandler {
let updated = request.merge(&rule)?;
updated.rule.update(&client).await?;
self.cache.insert_rule(&updated.rule.id, updated.clone());
// TODO: Update rule event
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Updated(Updated::Rule(
updated.rule.id,
))))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
Ok(updated)
}

pub async fn delete_rule(&self, rule: &CachedRule) -> Result<()> {
let client = self.database.get_client().await?;
rule.rule.delete(&client).await?;
self.cache.delete_rule(&rule.rule.id);
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Deleted(Deleted::Rule(
rule.rule.id,
))))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
Ok(())
}

Expand All @@ -118,14 +166,42 @@ impl DatabaseHandler {
} else {
vec![binding.origin_id]
};
self.cache.insert_rule_binding(resource_ids, binding);
self.cache
.insert_rule_binding(resource_ids, binding.clone());
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Created(
Created::RuleBinding {
rule_id: binding.rule_id,
origin_id: binding.origin_id,
resource_id: binding.object_id,
},
)))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
Ok(())
}
pub async fn delete_rule_binding(&self, request: DeleteRuleBinding) -> Result<()> {
let client = self.database.get_client().await?;
let (resource_id, rule_id) = request.get_ids()?;
RuleBinding::delete_by(rule_id, resource_id, &client).await?;
self.cache.remove_rule_bindings(resource_id, rule_id);
if let Err(err) = self
.natsio_handler
.register_server_event(ServerEvents::CACHEUPDATE(Action::Deleted(
Deleted::RuleBinding {
rule_id,
resource_id,
},
)))
.await
{
log::error!("{}", err);
return Err(anyhow::anyhow!("Notification emission failed"));
}
Ok(())
}
async fn evaluate_additional_rules(
Expand Down
39 changes: 25 additions & 14 deletions components/server/src/notification/natsio_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,42 @@ pub const STREAM_SUBJECTS: [&str; 5] = [
// Enum for internal events that are only of interest for the ArunaServer instances
pub enum ServerEvents {
MVREFRESH(i64), // UTC timestamp_seconds
CACHEUPDATE(CacheUpdate),
CACHEUPDATE(Action),
}

#[derive(Deserialize, Serialize)]
pub struct CacheUpdate {
pub action: Action,
pub resource: UpdateResource,
pub enum Action {
Created(Created),
Updated(Updated),
Deleted(Deleted),
}
#[derive(Deserialize, Serialize)]
pub enum Action {
Created,
Updated,
Deleted,
pub enum Created {
Rule(DieselUlid),
RuleBinding {
rule_id: DieselUlid,
origin_id: DieselUlid,
resource_id: DieselUlid,
},
}
#[derive(Deserialize, Serialize)]
pub enum UpdateResource {
pub enum Updated {
Rule(DieselUlid),
RuleBinding {
binding: DieselUlid,
resource: DieselUlid,
rule: DieselUlid,
rule_id: DieselUlid,
origin_id: DieselUlid,
resource_id: DieselUlid,
},
}
#[derive(Deserialize, Serialize)]
pub enum Deleted {
Rule(DieselUlid),
RuleBinding {
rule_id: DieselUlid,
resource_id: DieselUlid,
},
}
// ----------------------------------------------------------- //

// ----------------------------------------------------------- //
pub struct NatsIoHandler {
jetstream_context: Context,
stream: Stream,
Expand Down

0 comments on commit 57e616c

Please sign in to comment.