Skip to content

Commit 0c1b7bf

Browse files
author
Devdutt Shenoi
committed
refactor: store correlations as a mapping
1 parent 6f237ce commit 0c1b7bf

File tree

3 files changed

+105
-97
lines changed

3 files changed

+105
-97
lines changed

src/correlation.rs

Lines changed: 94 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,25 @@
1616
*
1717
*/
1818

19-
use std::collections::HashSet;
19+
use std::collections::{HashMap, HashSet};
2020

2121
use actix_web::{http::header::ContentType, Error};
2222
use chrono::Utc;
2323
use datafusion::error::DataFusionError;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use relative_path::RelativePathBuf;
2728
use serde::{Deserialize, Serialize};
2829
use serde_json::Error as SerdeError;
2930
use tokio::sync::RwLock;
30-
use tracing::{error, trace, warn};
31+
use tracing::error;
3132

3233
use crate::{
33-
handlers::http::rbac::RBACError,
34+
handlers::http::{
35+
rbac::RBACError,
36+
users::{CORRELATION_DIR, USERS_ROOT_DIR},
37+
},
3438
option::CONFIG,
3539
query::QUERY_SESSION,
3640
rbac::{map::SessionKey, Users},
@@ -41,28 +45,32 @@ use crate::{
4145

4246
pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
4347

48+
type CorrelationMap = HashMap<CorrelationId, CorrelationConfig>;
49+
4450
#[derive(Debug, Default, derive_more::Deref)]
45-
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
51+
pub struct Correlation(RwLock<HashMap<UserId, CorrelationMap>>);
4652

4753
impl Correlation {
4854
// Load correlations from storage
4955
pub async fn load(&self) -> anyhow::Result<()> {
5056
let store = CONFIG.storage().get_object_store();
5157
let all_correlations = store.get_all_correlations().await.unwrap_or_default();
5258

53-
let correlations: Vec<CorrelationConfig> = all_correlations
54-
.into_iter()
55-
.flat_map(|(_, correlations_bytes)| correlations_bytes)
56-
.filter_map(|correlation| {
57-
serde_json::from_slice(&correlation)
58-
.inspect_err(|e| {
59-
error!("Unable to load correlation: {e}");
60-
})
61-
.ok()
62-
})
63-
.collect();
64-
65-
self.write().await.extend(correlations);
59+
for correlations_bytes in all_correlations.values().flatten() {
60+
let Ok(correlation) = serde_json::from_slice::<CorrelationConfig>(correlations_bytes)
61+
.inspect_err(|e| {
62+
error!("Unable to load correlation file : {e}");
63+
})
64+
else {
65+
continue;
66+
};
67+
68+
self.write()
69+
.await
70+
.entry(correlation.user_id.to_owned())
71+
.or_insert_with(HashMap::new)
72+
.insert(correlation.id.to_owned(), correlation);
73+
}
6674

6775
Ok(())
6876
}
@@ -72,21 +80,26 @@ impl Correlation {
7280
session_key: &SessionKey,
7381
user_id: &str,
7482
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
75-
let correlations = self.read().await.iter().cloned().collect_vec();
83+
let Some(correlations) = self.read().await.get(user_id).cloned() else {
84+
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
85+
"Unable to find correlations for user - {user_id}"
86+
))));
87+
};
7688

7789
let mut user_correlations = vec![];
7890
let permissions = Users.get_permissions(session_key);
7991

80-
for c in correlations {
81-
let tables = &c
92+
for correlation in correlations.values() {
93+
let tables = &correlation
8294
.table_configs
8395
.iter()
8496
.map(|t| t.table_name.clone())
8597
.collect_vec();
86-
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
87-
user_correlations.push(c);
98+
if user_auth_for_query(&permissions, tables).is_ok() && correlation.user_id == user_id {
99+
user_correlations.push(correlation.clone());
88100
}
89101
}
102+
90103
Ok(user_correlations)
91104
}
92105

@@ -95,45 +108,57 @@ impl Correlation {
95108
correlation_id: &str,
96109
user_id: &str,
97110
) -> Result<CorrelationConfig, CorrelationError> {
98-
let read = self.read().await;
99-
let correlation = read
100-
.iter()
101-
.find(|c| c.id == correlation_id && c.user_id == user_id)
102-
.cloned();
103-
104-
correlation.ok_or_else(|| {
105-
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
106-
"Unable to find correlation with ID- {correlation_id}"
107-
)))
108-
})
111+
self.read()
112+
.await
113+
.get(user_id)
114+
.and_then(|correlations| correlations.get(correlation_id))
115+
.cloned()
116+
.ok_or_else(|| {
117+
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
118+
"Unable to find correlation with ID- {correlation_id}"
119+
)))
120+
})
109121
}
110122

123+
/// Insert new or replace existing correlation for the user and with the same ID
111124
pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
112-
// save to memory
113-
let mut s = self.write().await;
114-
s.retain(|c| c.id != correlation.id);
115-
s.push(correlation.clone());
125+
// Update in storage
126+
let correlation_bytes = serde_json::to_vec(&correlation)?.into();
127+
let path = correlation.path();
128+
CONFIG
129+
.storage()
130+
.get_object_store()
131+
.put_object(&path, correlation_bytes)
132+
.await?;
133+
134+
// Update in memory
135+
self.write()
136+
.await
137+
.entry(correlation.user_id.to_owned())
138+
.or_insert_with(HashMap::new)
139+
.insert(correlation.id.to_owned(), correlation.clone());
140+
116141
Ok(())
117142
}
118143

119-
pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
120-
// now delete from memory
121-
let read_access = self.read().await;
144+
/// Delete correlation from memory and storage
145+
pub async fn delete(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
146+
// Delete from memory
147+
self.write()
148+
.await
149+
.entry(correlation.user_id.to_owned())
150+
.and_modify(|correlations| {
151+
correlations.remove(&correlation.id);
152+
});
153+
154+
// Delete from storage
155+
let path = correlation.path();
156+
CONFIG
157+
.storage()
158+
.get_object_store()
159+
.delete_object(&path)
160+
.await?;
122161

123-
let index = read_access
124-
.iter()
125-
.enumerate()
126-
.find(|(_, c)| c.id == correlation_id)
127-
.to_owned();
128-
129-
if let Some((index, _)) = index {
130-
// drop the read access in order to get exclusive write access
131-
drop(read_access);
132-
self.0.write().await.remove(index);
133-
trace!("removed correlation from memory");
134-
} else {
135-
warn!("Correlation ID- {correlation_id} not found in memory!");
136-
}
137162
Ok(())
138163
}
139164
}
@@ -144,21 +169,33 @@ pub enum CorrelationVersion {
144169
V1,
145170
}
146171

172+
type CorrelationId = String;
173+
type UserId = String;
174+
147175
#[derive(Debug, Clone, Serialize, Deserialize)]
148176
#[serde(rename_all = "camelCase")]
149177
pub struct CorrelationConfig {
150178
pub version: CorrelationVersion,
151179
pub title: String,
152-
pub id: String,
153-
pub user_id: String,
180+
pub id: CorrelationId,
181+
pub user_id: UserId,
154182
pub table_configs: Vec<TableConfig>,
155183
pub join_config: JoinConfig,
156184
pub filter: Option<FilterQuery>,
157185
pub start_time: Option<String>,
158186
pub end_time: Option<String>,
159187
}
160188

161-
impl CorrelationConfig {}
189+
impl CorrelationConfig {
190+
pub fn path(&self) -> RelativePathBuf {
191+
RelativePathBuf::from_iter([
192+
USERS_ROOT_DIR,
193+
&self.user_id,
194+
CORRELATION_DIR,
195+
&format!("{}.json", self.id),
196+
])
197+
}
198+
}
162199

163200
#[derive(Debug, Clone, Serialize, Deserialize)]
164201
#[serde(rename_all = "camelCase")]

src/handlers/http/correlation.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
*
1717
*/
1818

19+
use actix_web::web::Path;
1920
use actix_web::{web, HttpRequest, HttpResponse, Responder};
2021
use anyhow::Error;
2122
use bytes::Bytes;
2223
use itertools::Itertools;
2324

2425
use crate::rbac::Users;
25-
use crate::storage::object_storage::correlation_path;
2626
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
2727
use crate::{option::CONFIG, utils::actix::extract_session_key_from_req};
2828

@@ -76,24 +76,12 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
7676
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
7777
let session_key = extract_session_key_from_req(&req)
7878
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
79-
let user_id = get_user_from_request(&req)
80-
.map(|s| get_hash(&s.to_string()))
81-
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
8279

8380
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
8481

8582
correlation_request.validate(&session_key).await?;
8683

87-
let mut correlation: CorrelationConfig = correlation_request.into();
88-
correlation.user_id.clone_from(&user_id);
89-
let correlation_id = &correlation.id;
90-
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
91-
92-
let store = CONFIG.storage().get_object_store();
93-
let correlation_bytes = serde_json::to_vec(&correlation)?;
94-
store
95-
.put_object(&path, Bytes::from(correlation_bytes))
96-
.await?;
84+
let correlation: CorrelationConfig = correlation_request.into();
9785

9886
// Save to memory
9987
CORRELATIONS.update(&correlation).await?;
@@ -132,8 +120,7 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
132120
let correlation =
133121
correlation_request.generate_correlation_config(correlation_id.to_owned(), user_id.clone());
134122

135-
let correlation_id = &correlation.id;
136-
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
123+
let path = correlation.path();
137124

138125
let store = CONFIG.storage().get_object_store();
139126
let correlation_bytes = serde_json::to_vec(&correlation)?;
@@ -147,20 +134,19 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
147134
Ok(web::Json(correlation))
148135
}
149136

150-
pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
137+
pub async fn delete(
138+
req: HttpRequest,
139+
correlation_id: Path<String>,
140+
) -> Result<impl Responder, CorrelationError> {
141+
let correlation_id = correlation_id.into_inner();
151142
let session_key = extract_session_key_from_req(&req)
152143
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
153144
let user_id = get_user_from_request(&req)
154145
.map(|s| get_hash(&s.to_string()))
155146
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
156147

157-
let correlation_id = req
158-
.match_info()
159-
.get("correlation_id")
160-
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
161-
162148
let correlation = CORRELATIONS
163-
.get_correlation(correlation_id, &user_id)
149+
.get_correlation(&correlation_id, &user_id)
164150
.await?;
165151

166152
// validate user's query auth
@@ -173,13 +159,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
173159

174160
user_auth_for_query(&permissions, tables)?;
175161

176-
let correlation_id = &correlation.id;
177-
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
178-
179-
let store = CONFIG.storage().get_object_store();
180-
store.delete_object(&path).await?;
162+
CORRELATIONS.delete(&correlation).await?;
181163

182-
// Delete from memory
183-
CORRELATIONS.delete(correlation_id).await?;
184164
Ok(HttpResponse::Ok().finish())
185165
}

src/storage/object_storage.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727

2828
use crate::event::format::LogSource;
2929
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
30-
use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
30+
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
3131
use crate::metadata::SchemaVersion;
3232
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
3333
use crate::option::Mode;
@@ -698,15 +698,6 @@ pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) ->
698698
])
699699
}
700700

701-
pub fn correlation_path(user_id: &str, correlation_file_name: &str) -> RelativePathBuf {
702-
RelativePathBuf::from_iter([
703-
USERS_ROOT_DIR,
704-
user_id,
705-
CORRELATION_DIR,
706-
correlation_file_name,
707-
])
708-
}
709-
710701
/// path will be ".parseable/.parsable.json"
711702
#[inline(always)]
712703
pub fn parseable_json_path() -> RelativePathBuf {

0 commit comments

Comments
 (0)