Skip to content

Commit

Permalink
bump versions, rusoto -> aws-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
ddboline committed Oct 1, 2023
1 parent 6fe3875 commit b412afa
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 164 deletions.
76 changes: 37 additions & 39 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "security_log_analysis_rust"
version = "0.10.10"
version = "0.11.0"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand All @@ -13,55 +13,53 @@ extended-description = """\
Analyze Auth Logs."""

[dependencies]
http = "0.2"
deadpool = {version = "0.9", features=["serde", "rt_tokio_1"]}
deadpool-postgres = {version="0.10", features=["serde"]}
rusoto_core = {version = "0.48", default_features = false, features=["rustls"]}
rusoto_s3 = {version = "0.48", default_features = false, features=["rustls"]}
rusoto_ses = {version = "0.48", default_features = false, features=["rustls"]}
s3-ext = "0.5"
sts_profile_auth = "0.7"
time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]}
time-tz = {version="2.0", features=["system"]}
rayon = "1.5"
serde = { version="1.0", features=["derive"]}
serde_json = "1.0"
anyhow = "1.0"
authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.11.0"}
aws-config = "0.56"
aws-sdk-s3 = "0.31"
aws-sdk-ses = "0.31"
bytes = "1.0"
cached = {version="0.46", features=["async", "async_tokio_rt_multi_thread"]}
chrono = "0.4"
clap = {version="4.0", features=["derive"]}
deadpool = {version = "0.10", features=["serde", "rt_tokio_1"]}
deadpool-postgres = {version="0.11", features=["serde"]}
deadqueue = "0.2"
derive_more = "0.99"
dioxus = "0.4"
dioxus-ssr = "0.4"
dirs = "5.0"
dotenv = "0.15"
glob = "0.3"
envy = "0.4"
env_logger = "0.10"
flate2 = "1.0"
clap = {version="4.0", features=["derive"]}
futures = "0.3"
glob = "0.3"
http = "0.2"
itertools = "0.11"
log = "0.4"
maplit = "1.0"
parking_lot = "0.12"
tokio-postgres = {version="0.7", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]}
polars = {version="0.33", features=["temporal", "parquet", "lazy"]}
postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.5", features=["deadpool"]}
postgres-types = {version="0.2", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]}
rand = "0.8"
log = "0.4"
env_logger = "0.10"
anyhow = "1.0"
thiserror = "1.0"
dirs = "5.0"
rayon = "1.5"
refinery = {version="0.8", features=["tokio-postgres"]}
reqwest = {version="0.11", features=["json", "rustls-tls"], default_features=false}
futures = "0.3"
tokio = {version="1.21", features=["full"]}
derive_more = "0.99"
bytes = "1.0"
envy = "0.4"
serde = { version="1.0", features=["derive"]}
serde_json = "1.0"
smallvec = "1.6"
deadqueue = "0.2"
itertools = "0.11"
stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi"], tag="0.9.2" }
stdout-channel = "0.6"
maplit = "1.0"
dioxus = "0.4"
dioxus-ssr = "0.4"
refinery = {version="0.8", features=["tokio-postgres"]}
postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.4", features=["deadpool"]}
stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi"], tag="0.9.1" }
chrono = "0.4"
polars = {version="0.32", features=["temporal", "parquet", "lazy"]}
authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.10.8"}
thiserror = "1.0"
time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]}
time-tz = {version="2.0", features=["system"]}
tokio-postgres = {version="0.7", features=["with-time-0_3", "with-uuid-1", "with-serde_json-1"]}
tokio = {version="1.21", features=["full"]}
rweb = {git = "https://github.com/ddboline/rweb.git", features=["openapi"], default-features=false, tag="0.15.1-1"}
rweb-helper = { git = "https://github.com/ddboline/rweb_helper.git", tag="0.5.0-1" }
uuid = { version = "1.0", features = ["serde", "v4"] }
cached = "0.44"

[[bin]]
name = "security-log-parse-rust"
Expand Down
4 changes: 2 additions & 2 deletions src/parse_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ impl fmt::Display for ServiceLogEntry {
pub async fn process_systemd_logs(config: &Config, pool: &PgPool) -> Result<(), Error> {
let alert_log_delay = config.alert_log_delay.unwrap_or(60);
let alert_buffer_size = config.alert_buffer_size.unwrap_or(10_000);

let ses_instance = SesInstance::new(None);
let sdk_config = aws_config::load_from_env().await;
let ses_instance = SesInstance::new(&sdk_config);
let Some(sending_email_address) = &config.sending_email_address else {
error!("No sending email given");
return Err(format_err!("No sending email given"));
Expand Down
3 changes: 2 additions & 1 deletion src/parse_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ impl ParseOpts {
}
}
ParseOpts::Sync { directory } => {
let sync = S3Sync::new();
let sdk_config = aws_config::load_from_env().await;
let sync = S3Sync::new(&sdk_config);
let directory = directory.unwrap_or_else(|| config.cache_dir.clone());
stdout.send(
sync.sync_dir("security-log-analysis", &directory, &config.s3_bucket, true)
Expand Down
6 changes: 3 additions & 3 deletions src/polars_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub async fn insert_db_into_parquet(
let d = row.datetime.to_offset(UtcOffset::UTC);
let datetime =
NaiveDateTime::from_timestamp_opt(d.unix_timestamp(), d.nanosecond())
.expect("Invalid timestamp");
.unwrap_or_default();
acc.datetime.push(datetime);
acc.host.push(row.host);
acc.username.push(row.username);
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn read_parquet_files(
}
let df = df
.lazy()
.groupby(["country"])
.group_by(["country"])
.agg([col("country_count").sum().alias("count")])
.sort(
"count",
Expand Down Expand Up @@ -281,7 +281,7 @@ fn get_country_count(
);
}
let df = df
.groupby(["country"])
.group_by(["country"])
.agg([col("datetime").count().alias("country_count")])
.collect()?;
Ok(df)
Expand Down
144 changes: 70 additions & 74 deletions src/s3_sync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use anyhow::Error;
use futures::{
future::{join_all, try_join_all},
stream::{StreamExt, TryStreamExt},
use anyhow::{format_err, Error};
use aws_config::SdkConfig;
use aws_sdk_s3::{
operation::list_objects::ListObjectsOutput, primitives::ByteStream, types::Object as S3Object,
Client as S3Client,
};
use futures::future::{join_all, try_join_all};
use log::debug;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use rusoto_core::Region;
use rusoto_s3::{GetObjectRequest, Object as S3Object, PutObjectRequest, S3Client};
use s3_ext::S3Ext;
use stack_string::{format_sstr, StackString};
use std::{
borrow::Borrow,
Expand All @@ -21,17 +20,10 @@ use std::{
sync::Arc,
time::SystemTime,
};
use sts_profile_auth::get_client_sts;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::task::spawn_blocking;
use tokio::{fs::File, task::spawn_blocking};

use crate::{exponential_retry, get_md5sum, polars_analysis::merge_parquet_files};

#[must_use]
fn get_s3_client() -> S3Client {
get_client_sts!(S3Client, Region::UsEast1).expect("Failed to obtain client")
}

#[derive(Clone)]
pub struct S3Sync {
s3_client: S3Client,
Expand Down Expand Up @@ -68,32 +60,28 @@ impl Borrow<str> for &KeyItem {

impl Default for S3Sync {
fn default() -> Self {
Self::new()
let sdk_config = SdkConfig::builder().build();
Self::new(&sdk_config)
}
}

fn process_s3_item(mut item: S3Object) -> Option<KeyItem> {
item.key.take().and_then(|key| {
item.e_tag.take().and_then(|etag| {
item.last_modified.as_ref().and_then(|last_mod| {
OffsetDateTime::parse(last_mod, &Rfc3339)
.ok()
.map(|lm| KeyItem {
key: key.into(),
etag: etag.trim_matches('"').into(),
timestamp: lm.unix_timestamp(),
size: item.size.unwrap_or(0) as u64,
})
})
})
let key = item.key.take()?;
let etag = item.e_tag.take()?;
let last_mod = item.last_modified.as_ref()?;
Some(KeyItem {
key: key.into(),
etag: etag.trim_matches('"').into(),
timestamp: last_mod.as_secs_f64() as i64,
size: item.size as u64,
})
}

impl S3Sync {
#[must_use]
pub fn new() -> Self {
pub fn new(sdk_config: &SdkConfig) -> Self {
Self {
s3_client: get_s3_client(),
s3_client: S3Client::from_conf(sdk_config.into()),
}
}

Expand All @@ -104,20 +92,41 @@ impl S3Sync {
}
}

async fn list_keys(
&self,
bucket: &str,
marker: Option<impl AsRef<str>>,
) -> Result<ListObjectsOutput, Error> {
let mut builder = self.s3_client.list_objects().bucket(bucket);
if let Some(marker) = marker {
builder = builder.marker(marker.as_ref());
}
builder.send().await.map_err(Into::into)
}

/// # Errors
/// Return error if db query fails
pub async fn get_list_of_keys(&self, bucket: &str) -> Result<Vec<KeyItem>, Error> {
let results: Result<Vec<_>, _> = exponential_retry(|| async move {
self.s3_client
.stream_objects(bucket)
.map(|res| res.map(process_s3_item))
.try_collect()
.await
.map_err(Into::into)
exponential_retry(|| async move {
let mut marker: Option<String> = None;
let mut list_of_keys = Vec::new();
loop {
let mut output = self.list_keys(bucket, marker.as_ref()).await?;
if let Some(contents) = output.contents.take() {
if let Some(last) = contents.last() {
if let Some(key) = &last.key {
marker.replace(key.into());
}
}
list_of_keys.extend(contents.into_iter().filter_map(process_s3_item));
}
if !output.is_truncated {
break;
}
}
Ok(list_of_keys)
})
.await;
let list_of_keys = results?.into_iter().flatten().collect();
Ok(list_of_keys)
.await
}

/// # Errors
Expand Down Expand Up @@ -251,20 +260,20 @@ impl S3Sync {
let etag: Result<StackString, Error> = exponential_retry(|| {
let tmp_path = tmp_path.clone();
async move {
let etag = self
let resp = self
.s3_client
.download_to_file(
GetObjectRequest {
bucket: s3_bucket.to_string(),
key: s3_key.to_string(),
..GetObjectRequest::default()
},
&tmp_path,
)
.await?
.e_tag
.unwrap_or_default();
Ok(etag.into())
.get_object()
.bucket(s3_bucket)
.key(s3_key)
.send()
.await?;
let etag: StackString = resp.e_tag().ok_or_else(|| format_err!("No etag"))?.into();
tokio::io::copy(
&mut resp.body.into_async_read(),
&mut File::create(tmp_path).await?,
)
.await?;
Ok(etag)
}
})
.await;
Expand Down Expand Up @@ -294,31 +303,18 @@ impl S3Sync {
local_file: &Path,
s3_bucket: &str,
s3_key: &str,
) -> Result<(), Error> {
self.upload_file_acl(local_file, s3_bucket, s3_key).await
}

/// # Errors
/// Return error if db query fails
pub async fn upload_file_acl(
&self,
local_file: &Path,
s3_bucket: &str,
s3_key: &str,
) -> Result<(), Error> {
exponential_retry(|| async move {
let body = ByteStream::read_from().path(local_file).build().await?;
self.s3_client
.upload_from_file(
local_file,
PutObjectRequest {
bucket: s3_bucket.to_string(),
key: s3_key.to_string(),
..PutObjectRequest::default()
},
)
.put_object()
.bucket(s3_bucket)
.key(s3_key)
.body(body)
.send()
.await
.map_err(Into::into)
.map(|_| ())
.map_err(Into::into)
})
.await
}
Expand Down
Loading

0 comments on commit b412afa

Please sign in to comment.