Skip to content

Commit b24e1f4

Browse files
committed
milestone cea-sec#1
1 parent 9701007 commit b24e1f4

File tree

7 files changed

+342
-10
lines changed

7 files changed

+342
-10
lines changed

Cargo.lock

+33-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ chrono = { version = "0.4.26", default-features = false, features = ["clock"] }
2323
encoding_rs = "0.8.32"
2424
deadpool-postgres = "0.14.0"
2525
deadpool-sqlite = "0.5.0"
26+
deadpool-redis = "0.18.0"
2627
openssl = "0.10.66"
2728
postgres-openssl = "0.5.0"
2829
strum = { version = "0.26.1", features = ["derive"] }

common/src/database/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::{
77
bookmark::BookmarkData,
88
database::postgres::PostgresDatabase,
99
database::sqlite::SQLiteDatabase,
10+
database::redis::RedisDatabase,
1011
heartbeat::{HeartbeatData, HeartbeatsCache},
1112
settings::Settings,
1213
subscription::{
@@ -21,6 +22,7 @@ use self::schema::{Migration, Version};
2122
pub mod postgres;
2223
pub mod schema;
2324
pub mod sqlite;
25+
pub mod redis;
2426

2527
pub type Db = Arc<dyn Database + Send + Sync>;
2628

@@ -40,6 +42,12 @@ pub async fn db_from_settings(settings: &Settings) -> Result<Db> {
4042
schema::postgres::register_migrations(&mut db);
4143
Ok(Arc::new(db))
4244
}
45+
crate::settings::Database::Redis(redis) => {
46+
let db = RedisDatabase::new(redis.connection_string())
47+
.await
48+
.context("Failed to initialize Redis client")?;
49+
Ok(Arc::new(db))
50+
}
4351
}
4452
}
4553

common/src/database/redis.rs

+278
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
// Some of the following code is inspired from
2+
// https://github.com/SkylerLipthay/schemamama_postgres. As stated by its
3+
// license (MIT), we include below its copyright notice and permission notice:
4+
//
5+
// The MIT License (MIT)
6+
//
7+
// Copyright (c) 2015 Skyler Lipthay
8+
//
9+
// Permission is hereby granted, free of charge, to any person obtaining a copy
10+
// of this software and associated documentation files (the "Software"), to deal
11+
// in the Software without restriction, including without limitation the rights
12+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13+
// copies of the Software, and to permit persons to whom the Software is
14+
// furnished to do so, subject to the following conditions:
15+
//
16+
// The above copyright notice and this permission notice shall be included in all
17+
// copies or substantial portions of the Software.
18+
//
19+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25+
// SOFTWARE.
26+
//
27+
//
28+
#![allow(unused_imports)]
29+
use anyhow::{anyhow, ensure, Context, Error, Result};
30+
use async_trait::async_trait;
31+
use deadpool_redis::redis::AsyncCommands;
32+
// use deadpool_sqlite::{Config, Pool, Runtime};
33+
use deadpool_redis::{Config, Pool, Runtime};
34+
use log::warn;
35+
use serde::de::value;
36+
// use rusqlite::{named_params, params, Connection, OptionalExtension, Row};
37+
use uuid::Uuid;
38+
use std::borrow::Borrow;
39+
use std::collections::btree_map::Entry::Vacant;
40+
use std::collections::{BTreeMap, BTreeSet};
41+
use std::str::FromStr;
42+
use std::sync::Arc;
43+
use std::time::SystemTime;
44+
45+
use crate::bookmark::BookmarkData;
46+
use crate::database::Database;
47+
use crate::heartbeat::{HeartbeatData, HeartbeatsCache};
48+
use crate::subscription::{
49+
ContentFormat, InternalVersion, PrincsFilter, SubscriptionData, SubscriptionMachine, SubscriptionMachineState, SubscriptionStatsCounters, SubscriptionUuid
50+
};
51+
52+
use super::schema::{Migration, MigrationBase, Version};
53+
54+
pub enum RedisKey {
55+
Subscriptions,
56+
Machines,
57+
Heartbeats,
58+
BookMarks
59+
}
60+
61+
impl RedisKey {
62+
pub fn as_str(&self) -> &str {
63+
match self {
64+
RedisKey::Subscriptions => "subscriptions",
65+
RedisKey::Machines => "machines",
66+
RedisKey::Heartbeats => "heartbeats",
67+
RedisKey::BookMarks => "bookmarks",
68+
}
69+
}
70+
}
71+
72+
73+
#[allow(unused)]
74+
pub struct RedisDatabase {
75+
pool: Pool,
76+
}
77+
78+
impl RedisDatabase {
79+
pub async fn new(connection_string: &str) -> Result<RedisDatabase> {
80+
let config = Config::from_url(connection_string);
81+
let pool = config.create_pool(Some(Runtime::Tokio1))?;
82+
let db = RedisDatabase {
83+
pool,
84+
};
85+
86+
Ok(db)
87+
}
88+
}
89+
90+
#[allow(unused)]
91+
#[async_trait]
92+
impl Database for RedisDatabase {
93+
async fn get_bookmark(&self, machine: &str, subscription: &str) -> Result<Option<String>> {
94+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
95+
let key = format!("{}:{}", RedisKey::BookMarks.as_str(), subscription);
96+
let machine_s = machine.to_string();
97+
let res : Option<String> = conn.hget(key, machine_s).await.context("Failed to get bookmark data")?;
98+
Ok(res)
99+
}
100+
101+
async fn get_bookmarks(&self, subscription: &str) -> Result<Vec<BookmarkData>> {
102+
todo!()
103+
}
104+
105+
async fn store_bookmark(
106+
&self,
107+
machine: &str,
108+
subscription: &str,
109+
bookmark: &str,
110+
) -> Result<()> {
111+
let machine_s = machine.to_string();
112+
let bookmark_s = bookmark.to_string();
113+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
114+
let key = format!("{}:{}", RedisKey::BookMarks.as_str(), subscription);
115+
let _ : String = conn.hset(key, machine_s, bookmark_s).await.context("Failed to store bookmark data")?;
116+
Ok(())
117+
}
118+
119+
async fn delete_bookmarks(
120+
&self,
121+
machine: Option<&str>,
122+
subscription: Option<&str>,
123+
) -> Result<()> {
124+
todo!()
125+
}
126+
127+
async fn get_heartbeats_by_machine(
128+
&self,
129+
machine: &str,
130+
subscription: Option<&str>,
131+
) -> Result<Vec<HeartbeatData>> {
132+
todo!()
133+
}
134+
135+
async fn get_heartbeats_by_ip(
136+
&self,
137+
ip: &str,
138+
subscription: Option<&str>,
139+
) -> Result<Vec<HeartbeatData>> {
140+
todo!()
141+
}
142+
143+
async fn get_heartbeats(&self) -> Result<Vec<HeartbeatData>> {
144+
todo!()
145+
}
146+
147+
async fn get_heartbeats_by_subscription(
148+
&self,
149+
subscription: &str,
150+
) -> Result<Vec<HeartbeatData>> {
151+
todo!()
152+
}
153+
154+
async fn store_heartbeat(
155+
&self,
156+
machine: &str,
157+
ip: String,
158+
subscription: &str,
159+
is_event: bool,
160+
) -> Result<()> {
161+
todo!()
162+
}
163+
164+
async fn store_heartbeats(&self, heartbeats: &HeartbeatsCache) -> Result<()> {
165+
todo!()
166+
}
167+
168+
async fn get_subscriptions(&self) -> Result<Vec<SubscriptionData>> {
169+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
170+
171+
let keys: Vec<String> = conn.hkeys(RedisKey::Subscriptions.as_str()).await.context("Failed to get subscription keys")?;
172+
173+
let mut subscriptions = Vec::new();
174+
175+
for key in keys {
176+
let subscription_json: Option<String> = conn.hget(RedisKey::Subscriptions.as_str(), &key).await.context("Failed to get subscription data")?;
177+
178+
if let Some(subscription_json) = subscription_json {
179+
match serde_json::from_str::<SubscriptionData>(&subscription_json) {
180+
Ok(subscription) => subscriptions.push(subscription),
181+
Err(err) => {
182+
log::warn!("Failed to deserialize subscription data for key {}: {}", key, err);
183+
}
184+
}
185+
} else {
186+
log::warn!("No subscription found for key: {}", key);
187+
}
188+
}
189+
190+
Ok(subscriptions)
191+
}
192+
193+
async fn get_subscription_by_identifier(
194+
&self,
195+
identifier: &str,
196+
) -> Result<Option<SubscriptionData>> {
197+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
198+
let key = identifier;
199+
let result: Option<String> = conn.hget(RedisKey::Subscriptions.as_str(), key).await.context("Failed to get subscription data")?;
200+
if result.is_none() {
201+
return Ok(None);
202+
}
203+
let subscription: SubscriptionData = serde_json::from_str(&result.unwrap()).context("Failed to deserialize subscription data")?;
204+
Ok(Some(subscription))
205+
}
206+
207+
async fn store_subscription(&self, subscription: &SubscriptionData) -> Result<()> {
208+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
209+
let key = format!("{}", subscription.uuid());
210+
// let key = format!("{}", subscription.name());
211+
let subs_json = serde_json::to_string(subscription).context("Failed to serialize subscription data")?;
212+
let _ : String = conn.hset(RedisKey::Subscriptions.as_str(), key, &subs_json).await.context("Failed to store subscription data")?;
213+
Ok(())
214+
}
215+
216+
async fn delete_subscription(&self, uuid: &str) -> Result<()> {
217+
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
218+
219+
let key = uuid;
220+
221+
let result: usize = conn.hdel(RedisKey::Subscriptions.as_str(), key)
222+
.await
223+
.context("Failed to delete subscription data")?;
224+
225+
if result == 0 {
226+
log::warn!("No subscription found for UUID: {}", uuid);
227+
}
228+
229+
Ok(())
230+
}
231+
232+
/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
233+
async fn current_version(&self) -> Result<Option<Version>> {
234+
todo!()
235+
}
236+
237+
/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
238+
async fn migrated_versions(&self) -> Result<BTreeSet<Version>> {
239+
Ok(BTreeSet::<i64>::new())
240+
}
241+
242+
/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
243+
async fn apply_migration(&self, version: Version) -> Result<()> {
244+
todo!()
245+
}
246+
247+
/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
248+
async fn revert_migration(&self, version: Version) -> Result<()> {
249+
todo!()
250+
}
251+
252+
/// Create the tables required to keep track of schema state. If the tables already
253+
/// exist, this function has no operation.
254+
async fn setup_schema(&self) -> Result<()> {
255+
todo!()
256+
}
257+
258+
async fn migrations(&self) -> BTreeMap<Version, Arc<dyn Migration + Send + Sync>> {
259+
BTreeMap::new()
260+
}
261+
262+
async fn get_stats(
263+
&self,
264+
subscription: &str,
265+
start_time: i64,
266+
) -> Result<SubscriptionStatsCounters> {
267+
todo!()
268+
}
269+
270+
async fn get_machines(
271+
&self,
272+
subscription: &str,
273+
start_time: i64,
274+
stat_type: Option<SubscriptionMachineState>,
275+
) -> Result<Vec<SubscriptionMachine>> {
276+
todo!()
277+
}
278+
}

common/src/heartbeat.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::collections::HashMap;
22

3-
use serde::{ser::SerializeStruct, Serialize, Serializer};
3+
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
44

55
use crate::{subscription::SubscriptionData, utils, utils::Timestamp};
66

7-
#[derive(Debug, Serialize, PartialEq, Eq, Clone)]
7+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
88
pub struct HeartbeatData {
99
machine: String,
1010
ip: String,

0 commit comments

Comments
 (0)