From b412afa2a9a2eddd15895bb3a3e429d40d14aee0 Mon Sep 17 00:00:00 2001 From: Daniel Boline Date: Sun, 1 Oct 2023 15:18:02 -0400 Subject: [PATCH] bump versions, rusoto -> aws-sdk --- Cargo.toml | 76 +++++++++++----------- src/parse_logs.rs | 4 +- src/parse_opts.rs | 3 +- src/polars_analysis.rs | 6 +- src/s3_sync.rs | 144 ++++++++++++++++++++--------------------- src/ses_client.rs | 84 +++++++++++------------- 6 files changed, 153 insertions(+), 164 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 64db303..a5d532f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "security_log_analysis_rust" -version = "0.10.10" +version = "0.11.0" authors = ["Daniel Boline "] edition = "2018" @@ -13,55 +13,53 @@ extended-description = """\ Analyze Auth Logs.""" [dependencies] -http = "0.2" -deadpool = {version = "0.9", features=["serde", "rt_tokio_1"]} -deadpool-postgres = {version="0.10", features=["serde"]} -rusoto_core = {version = "0.48", default_features = false, features=["rustls"]} -rusoto_s3 = {version = "0.48", default_features = false, features=["rustls"]} -rusoto_ses = {version = "0.48", default_features = false, features=["rustls"]} -s3-ext = "0.5" -sts_profile_auth = "0.7" -time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]} -time-tz = {version="2.0", features=["system"]} -rayon = "1.5" -serde = { version="1.0", features=["derive"]} -serde_json = "1.0" +anyhow = "1.0" +authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.11.0"} +aws-config = "0.56" +aws-sdk-s3 = "0.31" +aws-sdk-ses = "0.31" +bytes = "1.0" +cached = {version="0.46", features=["async", "async_tokio_rt_multi_thread"]} +chrono = "0.4" +clap = {version="4.0", features=["derive"]} +deadpool = {version = "0.10", features=["serde", "rt_tokio_1"]} +deadpool-postgres = {version="0.11", features=["serde"]} +deadqueue = "0.2" +derive_more = "0.99" +dioxus = "0.4" +dioxus-ssr = "0.4" +dirs = "5.0" dotenv = "0.15" -glob = "0.3" +envy = "0.4" +env_logger = "0.10" flate2 = "1.0" -clap = {version="4.0", features=["derive"]} +futures = "0.3" +glob = "0.3" +http = "0.2" +itertools = "0.11" +log = "0.4" +maplit = "1.0" parking_lot = "0.12" -tokio-postgres = {version="0.7", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]} +polars = {version="0.33", features=["temporal", "parquet", "lazy"]} +postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.5", features=["deadpool"]} postgres-types = {version="0.2", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]} rand = "0.8" -log = "0.4" -env_logger = "0.10" -anyhow = "1.0" -thiserror = "1.0" -dirs = "5.0" +rayon = "1.5" +refinery = {version="0.8", features=["tokio-postgres"]} reqwest = {version="0.11", features=["json", "rustls-tls"], default_features=false} -futures = "0.3" -tokio = {version="1.21", features=["full"]} -derive_more = "0.99" -bytes = "1.0" -envy = "0.4" +serde = { version="1.0", features=["derive"]} +serde_json = "1.0" smallvec = "1.6" -deadqueue = "0.2" -itertools = "0.11" +stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi"], tag="0.9.2" } stdout-channel = "0.6" -maplit = "1.0" -dioxus = "0.4" -dioxus-ssr = "0.4" -refinery = {version="0.8", features=["tokio-postgres"]} -postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.4", features=["deadpool"]} -stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi"], tag="0.9.1" } -chrono = "0.4" -polars = {version="0.32", features=["temporal", "parquet", "lazy"]} -authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.10.8"} +thiserror = "1.0" +time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]} +time-tz = {version="2.0", features=["system"]} +tokio-postgres = {version="0.7", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]} +tokio = {version="1.21", features=["full"]} rweb = {git = "https://github.com/ddboline/rweb.git", features=["openapi"], default-features=false, tag="0.15.1-1"} rweb-helper = { git = "https://github.com/ddboline/rweb_helper.git", tag="0.5.0-1" } uuid = { version = "1.0", features = ["serde", "v4"] } -cached = "0.44" [[bin]] name = "security-log-parse-rust" diff --git a/src/parse_logs.rs b/src/parse_logs.rs index d5dfc72..1863e16 100644 --- a/src/parse_logs.rs +++ b/src/parse_logs.rs @@ -447,8 +447,8 @@ impl fmt::Display for ServiceLogEntry { pub async fn process_systemd_logs(config: &Config, pool: &PgPool) -> Result<(), Error> { let alert_log_delay = config.alert_log_delay.unwrap_or(60); let alert_buffer_size = config.alert_buffer_size.unwrap_or(10_000); - - let ses_instance = SesInstance::new(None); + let sdk_config = aws_config::load_from_env().await; + let ses_instance = SesInstance::new(&sdk_config); let Some(sending_email_address) = &config.sending_email_address else { error!("No sending email given"); return Err(format_err!("No sending email given")); diff --git a/src/parse_opts.rs b/src/parse_opts.rs index cfcd66b..c1a358d 100644 --- a/src/parse_opts.rs +++ b/src/parse_opts.rs @@ -146,7 +146,8 @@ impl ParseOpts { } } ParseOpts::Sync { directory } => { - let sync = S3Sync::new(); + let sdk_config = aws_config::load_from_env().await; + let sync = S3Sync::new(&sdk_config); let directory = directory.unwrap_or_else(|| config.cache_dir.clone()); stdout.send( sync.sync_dir("security-log-analysis", &directory, &config.s3_bucket, true) diff --git a/src/polars_analysis.rs b/src/polars_analysis.rs index 281e45e..b9d2d04 100644 --- a/src/polars_analysis.rs +++ b/src/polars_analysis.rs @@ -119,7 +119,7 @@ pub async fn insert_db_into_parquet( let d = row.datetime.to_offset(UtcOffset::UTC); let datetime = NaiveDateTime::from_timestamp_opt(d.unix_timestamp(), d.nanosecond()) - .expect("Invalid timestamp"); + .unwrap_or_default(); acc.datetime.push(datetime); acc.host.push(row.host); acc.username.push(row.username); @@ -230,7 +230,7 @@ pub fn read_parquet_files( } let df = df .lazy() - .groupby(["country"]) + .group_by(["country"]) .agg([col("country_count").sum().alias("count")]) .sort( "count", @@ -281,7 +281,7 @@ fn get_country_count( ); } let df = df - .groupby(["country"]) + .group_by(["country"]) .agg([col("datetime").count().alias("country_count")]) .collect()?; Ok(df) diff --git a/src/s3_sync.rs b/src/s3_sync.rs index 2d197c9..cdcc370 100644 --- a/src/s3_sync.rs +++ b/src/s3_sync.rs @@ -1,16 +1,15 @@ -use anyhow::Error; -use futures::{ - future::{join_all, try_join_all}, - stream::{StreamExt, TryStreamExt}, +use anyhow::{format_err, Error}; +use aws_config::SdkConfig; +use aws_sdk_s3::{ + operation::list_objects::ListObjectsOutput, primitives::ByteStream, types::Object as S3Object, + Client as S3Client, }; +use futures::future::{join_all, try_join_all}; use log::debug; use rand::{ distributions::{Alphanumeric, DistString}, thread_rng, }; -use rusoto_core::Region; -use rusoto_s3::{GetObjectRequest, Object as S3Object, PutObjectRequest, S3Client}; -use s3_ext::S3Ext; use stack_string::{format_sstr, StackString}; use std::{ borrow::Borrow, @@ -21,17 +20,10 @@ use std::{ sync::Arc, time::SystemTime, }; -use sts_profile_auth::get_client_sts; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; -use tokio::task::spawn_blocking; +use tokio::{fs::File, task::spawn_blocking}; use crate::{exponential_retry, get_md5sum, polars_analysis::merge_parquet_files}; -#[must_use] -fn get_s3_client() -> S3Client { - get_client_sts!(S3Client, Region::UsEast1).expect("Failed to obtain client") -} - #[derive(Clone)] pub struct S3Sync { s3_client: S3Client, @@ -68,32 +60,28 @@ impl Borrow for &KeyItem { impl Default for S3Sync { fn default() -> Self { - Self::new() + let sdk_config = SdkConfig::builder().build(); + Self::new(&sdk_config) } } fn process_s3_item(mut item: S3Object) -> Option { - item.key.take().and_then(|key| { - item.e_tag.take().and_then(|etag| { - item.last_modified.as_ref().and_then(|last_mod| { - OffsetDateTime::parse(last_mod, &Rfc3339) - .ok() - .map(|lm| KeyItem { - key: key.into(), - etag: etag.trim_matches('"').into(), - timestamp: lm.unix_timestamp(), - size: item.size.unwrap_or(0) as u64, - }) - }) - }) + let key = item.key.take()?; + let etag = item.e_tag.take()?; + let last_mod = item.last_modified.as_ref()?; + Some(KeyItem { + key: key.into(), + etag: etag.trim_matches('"').into(), + timestamp: last_mod.as_secs_f64() as i64, + size: item.size as u64, }) } impl S3Sync { #[must_use] - pub fn new() -> Self { + pub fn new(sdk_config: &SdkConfig) -> Self { Self { - s3_client: get_s3_client(), + s3_client: S3Client::from_conf(sdk_config.into()), } } @@ -104,20 +92,41 @@ impl S3Sync { } } + async fn list_keys( + &self, + bucket: &str, + marker: Option>, + ) -> Result { + let mut builder = self.s3_client.list_objects().bucket(bucket); + if let Some(marker) = marker { + builder = builder.marker(marker.as_ref()); + } + builder.send().await.map_err(Into::into) + } + /// # Errors /// Return error if db query fails pub async fn get_list_of_keys(&self, bucket: &str) -> Result, Error> { - let results: Result, _> = exponential_retry(|| async move { - self.s3_client - .stream_objects(bucket) - .map(|res| res.map(process_s3_item)) - .try_collect() - .await - .map_err(Into::into) + exponential_retry(|| async move { + let mut marker: Option = None; + let mut list_of_keys = Vec::new(); + loop { + let mut output = self.list_keys(bucket, marker.as_ref()).await?; + if let Some(contents) = output.contents.take() { + if let Some(last) = contents.last() { + if let Some(key) = &last.key { + marker.replace(key.into()); + } + } + list_of_keys.extend(contents.into_iter().filter_map(process_s3_item)); + } + if !output.is_truncated { + break; + } + } + Ok(list_of_keys) }) - .await; - let list_of_keys = results?.into_iter().flatten().collect(); - Ok(list_of_keys) + .await } /// # Errors @@ -251,20 +260,20 @@ impl S3Sync { let etag: Result = exponential_retry(|| { let tmp_path = tmp_path.clone(); async move { - let etag = self + let resp = self .s3_client - .download_to_file( - GetObjectRequest { - bucket: s3_bucket.to_string(), - key: s3_key.to_string(), - ..GetObjectRequest::default() - }, - &tmp_path, - ) - .await? - .e_tag - .unwrap_or_default(); - Ok(etag.into()) + .get_object() + .bucket(s3_bucket) + .key(s3_key) + .send() + .await?; + let etag: StackString = resp.e_tag().ok_or_else(|| format_err!("No etag"))?.into(); + tokio::io::copy( + &mut resp.body.into_async_read(), + &mut File::create(tmp_path).await?, + ) + .await?; + Ok(etag) } }) .await; @@ -294,31 +303,18 @@ impl S3Sync { local_file: &Path, s3_bucket: &str, s3_key: &str, - ) -> Result<(), Error> { - self.upload_file_acl(local_file, s3_bucket, s3_key).await - } - - /// # Errors - /// Return error if db query fails - pub async fn upload_file_acl( - &self, - local_file: &Path, - s3_bucket: &str, - s3_key: &str, ) -> Result<(), Error> { exponential_retry(|| async move { + let body = ByteStream::read_from().path(local_file).build().await?; self.s3_client - .upload_from_file( - local_file, - PutObjectRequest { - bucket: s3_bucket.to_string(), - key: s3_key.to_string(), - ..PutObjectRequest::default() - }, - ) + .put_object() + .bucket(s3_bucket) + .key(s3_key) + .body(body) + .send() .await - .map_err(Into::into) .map(|_| ()) + .map_err(Into::into) }) .await } diff --git a/src/ses_client.rs b/src/ses_client.rs index a75d7d4..9194af3 100644 --- a/src/ses_client.rs +++ b/src/ses_client.rs @@ -1,9 +1,11 @@ use anyhow::Error; -use rusoto_core::Region; -use rusoto_ses::{Body, Content, Destination, Message, SendEmailRequest, Ses, SesClient}; +use aws_config::SdkConfig; +use aws_sdk_ses::{ + types::{Body, Content, Destination, Message}, + Client as SesClient, +}; use std::fmt; -use sts_profile_auth::get_client_sts; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use time::OffsetDateTime; #[derive(Clone)] pub struct SesInstance { @@ -18,16 +20,16 @@ impl fmt::Debug for SesInstance { impl Default for SesInstance { fn default() -> Self { - Self::new(None) + let sdk_config = SdkConfig::builder().build(); + Self::new(&sdk_config) } } impl SesInstance { #[must_use] - pub fn new(region: Option) -> Self { - let region = region.unwrap_or(Region::UsEast1); + pub fn new(sdk_config: &SdkConfig) -> Self { Self { - ses_client: get_client_sts!(SesClient, region).expect("Failed to open SesClient"), + ses_client: SesClient::from_conf(sdk_config.into()), } } @@ -40,29 +42,22 @@ impl SesInstance { sub: &str, msg: &str, ) -> Result<(), Error> { - let req = SendEmailRequest { - source: src.to_string(), - destination: Destination { - to_addresses: Some(vec![dest.to_string()]), - ..Destination::default() - }, - message: Message { - subject: Content { - data: sub.to_string(), - ..Content::default() - }, - body: Body { - html: Some(Content { - data: msg.to_string(), - ..Content::default() - }), - ..Body::default() - }, - }, - ..SendEmailRequest::default() - }; self.ses_client - .send_email(req) + .send_email() + .source(src) + .destination(Destination::builder().to_addresses(dest).build()) + .message( + Message::builder() + .subject(Content::builder().data(sub).build()) + .body( + Body::builder() + .text(Content::builder().data(msg).build()) + .html(Content::builder().data(msg).build()) + .build(), + ) + .build(), + ) + .send() .await .map_err(Into::into) .map(|_| ()) @@ -71,25 +66,24 @@ impl SesInstance { /// # Errors /// Returns error if api call fails pub async fn get_statistics(&self) -> Result<(SesQuotas, EmailStats), Error> { - let quota = self.ses_client.get_send_quota().await?; + let quota = self.ses_client.get_send_quota().send().await?; let stats = self .ses_client .get_send_statistics() + .send() .await? .send_data_points .unwrap_or_default() .into_iter() - .filter_map(|point| { - Some(EmailStats { - bounces: point.bounces?, - complaints: point.complaints?, - delivery_attempts: point.delivery_attempts?, - rejects: point.rejects?, - min_timestamp: point - .timestamp - .and_then(|s| OffsetDateTime::parse(&s, &Rfc3339).ok()), - ..EmailStats::default() - }) + .map(|point| EmailStats { + bounces: point.bounces, + complaints: point.complaints, + delivery_attempts: point.delivery_attempts, + rejects: point.rejects, + min_timestamp: point + .timestamp + .and_then(|t| OffsetDateTime::from_unix_timestamp(t.as_secs_f64() as i64).ok()), + ..EmailStats::default() }) .fold(EmailStats::default(), |mut stats, point| { stats.bounces += point.bounces; @@ -107,9 +101,9 @@ impl SesInstance { stats }); let quota = SesQuotas { - max_24_hour_send: quota.max_24_hour_send.unwrap_or(0.0), - max_send_rate: quota.max_send_rate.unwrap_or(0.0), - sent_last_24_hours: quota.sent_last_24_hours.unwrap_or(0.0), + max_24_hour_send: quota.max24_hour_send, + max_send_rate: quota.max_send_rate, + sent_last_24_hours: quota.sent_last24_hours, }; Ok((quota, stats)) }