Skip to content

Commit

Permalink
paginate endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
ddboline committed Oct 19, 2024
1 parent 1dc773a commit 5cbddc2
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 102 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "security_log_analysis_rust"
version = "0.11.9"
version = "0.11.10"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand All @@ -19,13 +19,13 @@ aws-config = {version="1.0", features=["behavior-version-latest"]}
aws-sdk-s3 = "1.1"
aws-sdk-ses = "1.1"
bytes = "1.0"
cached = {version="0.52", features=["async", "async_tokio_rt_multi_thread"]}
cached = {version="0.53", features=["async", "async_tokio_rt_multi_thread"]}
chrono = "0.4"
clap = {version="4.0", features=["derive"]}
deadpool = {version = "0.12", features=["serde", "rt_tokio_1"]}
deadpool-postgres = {version="0.14", features=["serde"]}
deadqueue = "0.2"
derive_more = "0.99"
derive_more = {version="1.0", features = ["full"]}
dioxus = "0.5"
dioxus-core = "0.5"
dioxus-ssr = "0.5"
Expand All @@ -40,13 +40,13 @@ itertools = "0.13"
log = "0.4"
maplit = "1.0"
parking_lot = "0.12"
polars = {version="0.41", features=["temporal", "parquet", "lazy"]}
polars = {version="0.43", features=["temporal", "parquet", "lazy"]}
postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.8", features=["deadpool"]}
postgres-types = {version="0.2", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]}
rand = "0.8"
rayon = "1.5"
refinery = {version="0.8", features=["tokio-postgres"]}
reqwest = {version="0.12", features=["json", "rustls-tls"], default_features=false}
reqwest = {version="0.12", features=["json", "rustls-tls"], default-features=false}
serde = { version="1.0", features=["derive"]}
serde_json = "1.0"
serde_yaml = "0.9"
Expand Down
2 changes: 2 additions & 0 deletions scripts/bootstrap_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ sudo -u postgres createuser -E -e $USER
sudo -u postgres psql -c "CREATE ROLE $USER PASSWORD '$PASSWORD' NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT LOGIN;"
sudo -u postgres psql -c "ALTER ROLE $USER PASSWORD '$PASSWORD' NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT LOGIN;"
sudo -u postgres createdb $DB
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE $DB TO $USER;"
sudo -u postgres psql $DB -c "GRANT ALL ON SCHEMA public TO $USER;"

cat > ${HOME}/.config/security_log_analysis_rust/config.env <<EOL
DATABASE_URL=postgresql://$USER:$PASSWORD@localhost:5432/$DB
Expand Down
3 changes: 1 addition & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use rweb::{
},
reject::Reject,
};
use std::borrow::Cow;
use std::{borrow::Cow, fmt::Error as FmtError};
use thiserror::Error;
use tokio::task::JoinError;
use std::fmt::Error as FmtError;

#[derive(Error, Debug)]
pub enum ServiceError {
Expand Down
235 changes: 177 additions & 58 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use anyhow::{format_err, Error};
use bytes::BytesMut;
use derive_more::Into;
use futures::{Stream, TryStreamExt};
use log::debug;
use postgres_query::{
client::GenericClient, query, query_dyn, Error as PgError, FromSqlRow, Parameter,
client::GenericClient, query, query_dyn, Error as PgError, FromSqlRow, Parameter, Query,
};
use rweb::Schema;
use serde::{Deserialize, Serialize};
use stack_string::{format_sstr, StackString};
use std::{cmp::Ordering, fmt, net::ToSocketAddrs, str::FromStr};
use std::{cmp::Ordering, convert::TryInto, fmt, net::ToSocketAddrs, str::FromStr};
use time::OffsetDateTime;
use tokio_postgres::types::{FromSql, IsNull, ToSql, Type};
use uuid::Uuid;
Expand Down Expand Up @@ -65,6 +66,22 @@ impl HostCountry {
})
}

/// # Errors
/// Return error if db query fails
pub async fn get_host_country_total(pool: &PgPool) -> Result<usize, Error> {
#[derive(FromSqlRow)]
struct Count {
count: i64,
}

let query = query!("SELECT count(*) FROM host_country");

let conn = pool.get().await?;
let count: Count = query.fetch_one(&conn).await?;

Ok(count.count.try_into()?)
}

/// # Errors
/// Return error if db query fails
pub async fn get_host_country(
Expand Down Expand Up @@ -250,21 +267,18 @@ impl IntrusionLog {
Ok(result.map(Into::into))
}

/// # Errors
/// Return error if db query fails
pub async fn get_intrusion_log_filtered(
pool: &PgPool,
service: Option<Service>,
server: Option<Host>,
min_datetime: Option<OffsetDateTime>,
max_datetime: Option<OffsetDateTime>,
limit: Option<usize>,
fn get_intrusion_log_filtered_query<'a>(
select_str: &'a str,
order_str: &'a str,
service: &'a Option<StackString>,
server: &'a Option<StackString>,
min_datetime: &'a Option<OffsetDateTime>,
max_datetime: &'a Option<OffsetDateTime>,
offset: Option<usize>,
) -> Result<impl Stream<Item = Result<Self, PgError>>, Error> {
limit: Option<usize>,
) -> Result<Query<'a>, PgError> {
let mut bindings = Vec::new();
let mut constraints = Vec::new();
let service = service.map(Service::to_str);
let server = server.map(Host::to_str);
if let Some(service) = &service {
constraints.push(format_sstr!("service=$service"));
bindings.push(("service", service as Parameter));
Expand All @@ -286,29 +300,85 @@ impl IntrusionLog {
} else {
format_sstr!("WHERE {}", constraints.join(" AND "))
};
let limit = if let Some(limit) = limit {
format_sstr!("LIMIT {limit}")
} else {
"".into()
};
let offset = if let Some(offset) = offset {
format_sstr!("OFFSET {offset}")
} else {
"".into()
};
let query = format_sstr!(
let mut query = format_sstr!(
r#"
SELECT * FROM intrusion_log
SELECT {select_str} FROM intrusion_log
{where_str}
ORDER BY datetime DESC
{limit} {offset}
{order_str}
"#,
);
let query = query_dyn!(&query, ..bindings)?;
if let Some(offset) = &offset {
query.push_str(&format_sstr!(" OFFSET {offset}"));
}
if let Some(limit) = &limit {
query.push_str(&format_sstr!(" LIMIT {limit}"));
}
bindings.shrink_to_fit();
debug!("query:\n{}", query);
query_dyn!(&query, ..bindings)
}

/// # Errors
/// Return error if db query fails
pub async fn get_intrusion_log_filtered(
pool: &PgPool,
service: Option<Service>,
server: Option<Host>,
min_datetime: Option<OffsetDateTime>,
max_datetime: Option<OffsetDateTime>,
offset: Option<usize>,
limit: Option<usize>,
) -> Result<impl Stream<Item = Result<Self, PgError>>, Error> {
let service = service.map(Service::to_str).map(Into::into);
let server = server.map(Host::to_str).map(Into::into);

let query = Self::get_intrusion_log_filtered_query(
"*",
"ORDER BY datetime DESC",
&service,
&server,
&min_datetime,
&max_datetime,
offset,
limit,
)?;
let conn = pool.get().await?;
query.fetch_streaming(&conn).await.map_err(Into::into)
}

/// # Errors
/// Return error if db query fails
pub async fn get_intrusion_log_filtered_total(
pool: &PgPool,
service: Option<Service>,
server: Option<Host>,
min_datetime: Option<OffsetDateTime>,
max_datetime: Option<OffsetDateTime>,
) -> Result<usize, Error> {
#[derive(FromSqlRow)]
struct Count {
count: i64,
}

let service = service.map(Service::to_str).map(Into::into);
let server = server.map(Host::to_str).map(Into::into);

let query = Self::get_intrusion_log_filtered_query(
"count(*)",
"",
&service,
&server,
&min_datetime,
&max_datetime,
None,
None,
)?;
let conn = pool.get().await?;
let count: Count = query.fetch_one(&conn).await?;

Ok(count.count.try_into()?)
}

/// # Errors
/// Return error if db query fails
pub async fn insert_single<C>(&self, conn: &C) -> Result<u64, Error>
Expand Down Expand Up @@ -613,32 +683,31 @@ impl SystemdLogMessages {
query.execute(&conn).await.map_err(Into::into)
}

/// # Errors
/// Return error if db query fails
pub async fn get_systemd_messages(
pool: &PgPool,
log_level: Option<LogLevel>,
log_unit: Option<&str>,
min_timestamp: Option<DateTimeType>,
max_timestamp: Option<DateTimeType>,
limit: Option<usize>,
fn get_systemd_messages_query<'a>(
select_str: &'a str,
order_str: &'a str,
log_level: &'a Option<LogLevel>,
log_unit: &'a Option<&str>,
min_timestamp: &'a Option<DateTimeType>,
max_timestamp: &'a Option<DateTimeType>,
offset: Option<usize>,
) -> Result<impl Stream<Item = Result<Self, PgError>>, Error> {
limit: Option<usize>,
) -> Result<Query<'a>, PgError> {
let mut constraints = Vec::new();
let mut bindings = Vec::new();
if let Some(log_level) = &log_level {
if let Some(log_level) = log_level {
constraints.push(format_sstr!("log_level=$log_level"));
bindings.push(("log_level", log_level as Parameter));
}
if let Some(log_unit) = &log_unit {
if let Some(log_unit) = log_unit {
constraints.push(format_sstr!("log_unit=$log_unit"));
bindings.push(("log_unit", log_unit as Parameter));
}
if let Some(min_timestamp) = &min_timestamp {
if let Some(min_timestamp) = min_timestamp {
constraints.push(format_sstr!("log_timestamp > $min_timestamp"));
bindings.push(("min_timestamp", min_timestamp as Parameter));
}
if let Some(max_timestamp) = &max_timestamp {
if let Some(max_timestamp) = max_timestamp {
constraints.push(format_sstr!("log_timestamp > $max_timestamp"));
bindings.push(("max_timestamp", max_timestamp as Parameter));
}
Expand All @@ -647,25 +716,75 @@ impl SystemdLogMessages {
} else {
format_sstr!("WHERE {}", constraints.join(" AND "))
};
let limit = if let Some(limit) = limit {
format_sstr!("LIMIT {limit}")
} else {
"".into()
};
let offset = if let Some(offset) = offset {
format_sstr!("OFFSET {offset}")
} else {
"".into()
};
let query = format_sstr!(
let mut query = format_sstr!(
r#"
SELECT * FROM systemd_log_messages
SELECT {select_str} FROM systemd_log_messages
{where_str}
ORDER BY log_timestamp
{limit} {offset}
{order_str}
"#,
);
let query = query_dyn!(&query, ..bindings)?;
if let Some(offset) = offset {
query.push_str(&format_sstr!(" OFFSET {offset}"));
}
if let Some(limit) = limit {
query.push_str(&format_sstr!(" LIMIT {limit}"));
}
bindings.shrink_to_fit();
debug!("query:\n{}", query);
query_dyn!(&query, ..bindings)
}

/// # Errors
/// Return error if db query fails
pub async fn get_total(
pool: &PgPool,
log_level: Option<LogLevel>,
log_unit: Option<&str>,
min_timestamp: Option<DateTimeType>,
max_timestamp: Option<DateTimeType>,
) -> Result<usize, Error> {
#[derive(FromSqlRow)]
struct Count {
count: i64,
}

let query = Self::get_systemd_messages_query(
"count(*)",
"",
&log_level,
&log_unit,
&min_timestamp,
&max_timestamp,
None,
None,
)?;
let conn = pool.get().await?;
let count: Count = query.fetch_one(&conn).await?;

Ok(count.count.try_into()?)
}

/// # Errors
/// Return error if db query fails
pub async fn get_systemd_messages(
pool: &PgPool,
log_level: Option<LogLevel>,
log_unit: Option<&str>,
min_timestamp: Option<DateTimeType>,
max_timestamp: Option<DateTimeType>,
offset: Option<usize>,
limit: Option<usize>,
) -> Result<impl Stream<Item = Result<Self, PgError>>, Error> {
let query = Self::get_systemd_messages_query(
"*",
"ORDER BY log_timestamp",
&log_level,
&log_unit,
&min_timestamp,
&max_timestamp,
offset,
limit,
)?;
let conn = pool.get().await?;
query.fetch_streaming(&conn).await.map_err(Into::into)
}
Expand Down
2 changes: 1 addition & 1 deletion src/parse_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ impl ParseOpts {
None,
None,
None,
Some(1000),
None,
Some(1000),
)
.await?
.try_collect()
Expand Down
5 changes: 1 addition & 4 deletions src/pgpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ impl PgPool {
config.dbname.replace(db.to_string());
}

let pool = config
.builder(NoTls)?
.max_size(4)
.build()?;
let pool = config.builder(NoTls)?.max_size(4).build()?;

Ok(Self {
pgurl: Arc::new(pgurl.into()),
Expand Down
Loading

0 comments on commit 5cbddc2

Please sign in to comment.