Skip to content

Commit

Permalink
refactor key_item_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ddboline committed Nov 2, 2023
1 parent 4e88500 commit 9935f45
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 84 deletions.
13 changes: 8 additions & 5 deletions migrations/V10__key_item_cache.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
CREATE TABLE key_item_cache (
s3_key TEXT NOT NULL UNIQUE PRIMARY KEY,
etag TEXT NOT NULL,
s3_timestamp BIGINT NOT NULL,
s3_size BIGINT NOT NULL,
has_local BOOLEAN NOT NULL DEFAULT false,
has_remote BOOLEAN NOT NULL DEFAULT false
s3_etag TEXT,
s3_timestamp BIGINT,
s3_size BIGINT,
local_etag TEXT,
local_timestamp BIGINT,
local_size BIGINT,
do_download BOOLEAN NOT NULL DEFAULT false,
do_upload BOOLEAN NOT NULL DEFAULT false
)
79 changes: 51 additions & 28 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,14 +671,17 @@ impl SystemdLogMessages {
}
}

#[derive(FromSqlRow, Serialize, Deserialize, Debug, Clone)]
#[derive(FromSqlRow, Serialize, Deserialize, Debug, Clone, Default)]
pub struct KeyItemCache {
pub s3_key: StackString,
pub etag: StackString,
pub s3_timestamp: i64,
pub s3_size: i64,
pub has_local: bool,
pub has_remote: bool,
pub s3_etag: Option<StackString>,
pub s3_timestamp: Option<i64>,
pub s3_size: Option<i64>,
pub local_etag: Option<StackString>,
pub local_timestamp: Option<i64>,
pub local_size: Option<i64>,
pub do_download: bool,
pub do_upload: bool,
}

impl KeyItemCache {
Expand All @@ -697,17 +700,25 @@ impl KeyItemCache {
/// Return error if db query fails
pub async fn get_files(
pool: &PgPool,
has_remote: bool,
has_local: bool,
do_download: Option<bool>,
do_upload: Option<bool>,
) -> Result<impl Stream<Item = Result<Self, PgError>>, Error> {
let query = query!(
r#"
SELECT * FROM key_item_cache
WHERE has_remote = $has_remote AND has_local = $has_local
"#,
has_remote = has_remote,
has_local = has_local,
);
let mut bindings = Vec::new();
let mut constraints = Vec::new();
if let Some(do_download) = &do_download {
constraints.push(format_sstr!("do_download=$do_download"));
bindings.push(("do_download", do_download as Parameter));
}
if let Some(do_upload) = &do_upload {
constraints.push(format_sstr!("do_upload=$do_upload"));
bindings.push(("do_upload", do_upload as Parameter));
}
let query = if constraints.is_empty() {
query!("SELECT * FROM key_item_cache")
} else {
let query = format_sstr!("SELECT * FROM key_item_cache WHERE {}", constraints.join(" AND "));
query_dyn!(&query, ..bindings)?
};
let conn = pool.get().await?;
query.fetch_streaming(&conn).await.map_err(Into::into)
}
Expand All @@ -719,31 +730,43 @@ impl KeyItemCache {
r#"
INSERT INTO key_item_cache (
s3_key,
etag,
s3_etag,
s3_timestamp,
s3_size,
has_local,
has_remote
local_etag,
local_timestamp,
local_size,
do_download,
do_upload
) VALUES (
$s3_key,
$etag,
$s3_etag,
$s3_timestamp,
$s3_size,
$has_local,
$has_remote
$local_etag,
$local_timestamp,
$local_size,
$do_download,
$do_upload
) ON CONFLICT (s3_key) DO UPDATE
SET etag=$etag,
SET s3_etag=$s3_etag,
s3_timestamp=$s3_timestamp,
s3_size=$s3_size,
has_local=$has_local,
has_remote=$has_remote
local_etag=$local_etag,
local_timestamp=$local_timestamp,
local_size=$local_size,
do_download=$do_download,
do_upload=$do_upload
"#,
s3_key = self.s3_key,
etag = self.etag,
s3_etag = self.s3_etag,
s3_timestamp = self.s3_timestamp,
s3_size = self.s3_size,
has_local = self.has_local,
has_remote = self.has_remote,
local_etag = self.local_etag,
local_timestamp = self.local_timestamp,
local_size = self.local_size,
do_download = self.do_download,
do_upload = self.do_upload,
);
let conn = pool.get().await?;
query.execute(&conn).await.map_err(Into::into)
Expand Down
98 changes: 47 additions & 51 deletions src/s3_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use rand::{
use stack_string::{format_sstr, StackString};
use std::{
borrow::Borrow,
cmp::Ordering,
convert::{TryFrom, TryInto},
fs,
hash::{Hash, Hasher},
Expand Down Expand Up @@ -56,27 +55,15 @@ impl KeyItem {
}
}

impl From<KeyItemCache> for KeyItem {
fn from(value: KeyItemCache) -> Self {
Self {
key: value.s3_key,
etag: value.etag,
timestamp: value.s3_timestamp,
size: value.s3_size as u64,
}
}
}

impl TryFrom<KeyItem> for KeyItemCache {
type Error = Error;
fn try_from(value: KeyItem) -> Result<Self, Self::Error> {
Ok(Self {
s3_key: value.key,
etag: value.etag,
s3_timestamp: value.timestamp,
s3_size: value.size.try_into()?,
has_local: false,
has_remote: false,
s3_etag: Some(value.etag),
s3_timestamp: Some(value.timestamp),
s3_size: Some(value.size.try_into()?),
..Self::default()
})
}
}
Expand Down Expand Up @@ -151,21 +138,21 @@ impl S3Sync {
if let Some(key) = KeyItem::from_s3_object(object) {
if let Some(mut key_item) = KeyItemCache::get_by_key(pool, &key.key).await?
{
key_item.has_remote = true;
if key.timestamp != key_item.s3_timestamp && key.etag != key_item.etag {
let key_size: i64 = key.size.try_into()?;
match key_size.cmp(&key_item.s3_size) {
Ordering::Greater | Ordering::Less => {
key_item = key.try_into()?;
key_item.has_remote = true;
}
Ordering::Equal => {}
}
key_item.s3_etag = Some(key.etag);
key_item.s3_size = Some(key.size.try_into()?);
key_item.s3_timestamp = Some(key.timestamp);

if key_item.s3_etag == key_item.local_etag {
key_item.do_download = false;
key_item.do_upload = false;
} else {
key_item.do_download = true;
key_item.do_upload = true;
}
key_item.insert(pool).await?;
} else {
let mut key_item: KeyItemCache = key.try_into()?;
key_item.has_remote = true;
key_item.do_download = true;
key_item.insert(pool).await?;
};
nkeys += 1;
Expand Down Expand Up @@ -197,23 +184,23 @@ impl S3Sync {
if let Some(file_name) = f.file_name() {
let key: StackString = file_name.to_string_lossy().as_ref().into();
if let Some(mut key_item) = KeyItemCache::get_by_key(pool, &key).await? {
if modified != key_item.s3_timestamp && size != key_item.s3_size {
if Some(size) != key_item.local_size {
key_item.local_size = Some(size);
let etag = get_md5sum(&f).await?;
if etag != key_item.etag {
key_item.has_local = true;
key_item.has_remote = false;
key_item.insert(pool).await?;
}
key_item.local_etag = Some(etag);
key_item.local_timestamp = Some(modified);
key_item.do_upload = true;
key_item.insert(pool).await?;
}
} else {
let etag = get_md5sum(&f).await?;
KeyItemCache {
s3_key: key,
etag,
s3_timestamp: modified,
s3_size: size,
has_local: true,
has_remote: false,
local_etag: Some(etag),
local_timestamp: Some(modified),
local_size: Some(size),
do_upload: true,
..KeyItemCache::default()
}
.insert(pool)
.await?;
Expand All @@ -238,7 +225,7 @@ impl S3Sync {
let mut number_uploaded = 0;
let mut number_downloaded = 0;

let mut stream = Box::pin(KeyItemCache::get_files(pool, true, false).await?);
let mut stream = Box::pin(KeyItemCache::get_files(pool, Some(true), None).await?);

while let Some(mut key_item) = stream.try_next().await? {
let local_file = local_dir.join(&key_item.s3_key);
Expand All @@ -251,27 +238,36 @@ impl S3Sync {
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs()
.try_into()?;
key_item.etag = get_md5sum(&local_file).await?;
key_item.s3_size = metadata.len().try_into()?;
key_item.s3_timestamp = modified;
key_item.has_local = true;
key_item.has_remote = false;
key_item.local_etag = Some(get_md5sum(&local_file).await?);
key_item.local_size = Some(metadata.len().try_into()?);
key_item.local_timestamp = Some(modified);
key_item.do_download = false;
if key_item.s3_etag != key_item.local_etag {
key_item.do_upload = true;
}
key_item.insert(pool).await?;
}

let mut stream = Box::pin(KeyItemCache::get_files(pool, false, true).await?);
let mut stream = Box::pin(KeyItemCache::get_files(pool, None, Some(true)).await?);

while let Some(mut key_item) = stream.try_next().await? {
let local_file = local_dir.join(&key_item.s3_key);
if !local_file.exists() {
key_item.has_local = false;
key_item.do_upload = false;
key_item.insert(pool).await?;
continue;
}
key_item.etag = self
let s3_etag = self
.upload_file(&local_file, s3_bucket, &key_item.s3_key)
.await?;
if Some(&s3_etag) != key_item.local_etag.as_ref() {
return Err(format_err!("Uploaded etag does not match local"));
}
key_item.s3_etag = Some(s3_etag);
key_item.s3_size = key_item.local_size;
key_item.s3_timestamp = key_item.local_timestamp;
number_uploaded += 1;
key_item.has_remote = true;
key_item.do_upload = false;
key_item.insert(pool).await?;
}

Expand Down Expand Up @@ -387,15 +383,15 @@ mod tests {
.get_and_process_keys(&config.s3_bucket, &pool)
.await?;

KeyItemCache::get_files(&pool, true, false)
KeyItemCache::get_files(&pool, Some(true), None)
.await?
.try_for_each(|key_item| async move {
println!("upload {}", key_item.s3_key);
Ok(())
})
.await?;

KeyItemCache::get_files(&pool, false, true)
KeyItemCache::get_files(&pool, None, Some(true))
.await?
.try_for_each(|key_item| async move {
println!("download {}", key_item.s3_key);
Expand Down

0 comments on commit 9935f45

Please sign in to comment.