Skip to content

Commit

Permalink
Add key item cache for garmin sync (#45)
Browse files Browse the repository at this point in the history
* keep track of files in key_item_cache table
  • Loading branch information
ddboline authored Nov 19, 2023
1 parent 4c157d4 commit 067d903
Show file tree
Hide file tree
Showing 21 changed files with 438 additions and 227 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ jobs:
- name: Clippy
run: cargo clippy -- -W clippy::pedantic

- name: Outdated
run: |
cargo install cargo-outdated && \
cargo outdated -d2
# - name: Outdated
# run: |
# cargo install cargo-outdated && \
# cargo outdated -d2

- name: Unused Deps
run: |
rustup update nightly && \
cargo +nightly install cargo-udeps && \
cargo +nightly udeps
# - name: Unused Deps
# run: |
# rustup update nightly && \
# cargo +nightly install cargo-udeps && \
# cargo +nightly udeps

- name: Build
run: |
Expand Down
2 changes: 1 addition & 1 deletion fitbit_bot/src/telegram_bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TelegramBot {
)
.await
{
Err(_) | Ok(Ok(_)) => FAILURE_COUNT.reset()?,
Err(_) | Ok(Ok(())) => FAILURE_COUNT.reset()?,
Ok(Err(_)) => FAILURE_COUNT.increment()?,
}
}
Expand Down
4 changes: 2 additions & 2 deletions fitbit_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ rayon = "1.5"
log = "0.4"
lazy_static = "1.4"
parking_lot = "0.12"
itertools = "0.11"
itertools = "0.12"
avro-rs = {version = "0.13", features = ["snappy"]}
uuid = { version = "1.0", features = ["serde", "v4"] }
postgres_query = {git = "https://github.com/ddboline/rust-postgres-query", tag = "0.3.5", features=["deadpool"]}
Expand All @@ -36,7 +36,7 @@ fitparser = {git="https://github.com/ddboline/fitparse-rs.git", branch="time-0.3
smallvec = "1.6"
crossbeam-utils = "0.8"
derive_more = "0.99"
polars = {version="0.33", features=["temporal", "parquet", "lazy"]}
polars = {version="0.35", features=["temporal", "parquet", "lazy"]}
stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="0.9.2" }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions garmin_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ rayon = "1.5"
regex = "1.4"
tempdir = "0.3"
serde_json = "1.0"
itertools = "0.11"
itertools = "0.12"
smallvec = "1.6"
tokio-postgres = {version = "0.7", features = ["with-time-0_3"]}
refinery = {version="0.8", features=["tokio-postgres"]}
stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="0.9.2" }
stdout-channel = "0.6"
aws-config = "0.56"
aws-config = {version="0.101", features=["behavior-version-latest"]}
24 changes: 10 additions & 14 deletions garmin_cli/src/garmin_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use garmin_reports::{

#[derive(Debug, PartialEq, Clone, Eq)]
pub enum GarminCliOptions {
Sync(bool),
Sync,
All,
Bootstrap,
FileNames(Vec<PathBuf>),
Expand Down Expand Up @@ -143,7 +143,7 @@ impl GarminCli {
let pool = self.get_pool();
GarminSummary::write_summary_to_postgres(&summary_list, &pool)
.await
.map(|_| Vec::new())
.map(|()| Vec::new())
}
}

Expand Down Expand Up @@ -219,12 +219,12 @@ impl GarminCli {
/// # Errors
/// Return error if `sync_everything` fails
pub async fn run_bootstrap(&self) -> Result<Vec<StackString>, Error> {
self.sync_everything(true).await
self.sync_everything().await
}

/// # Errors
/// Return error if `sync_dir` fails
pub async fn sync_everything(&self, check_md5: bool) -> Result<Vec<StackString>, Error> {
pub async fn sync_everything(&self) -> Result<Vec<StackString>, Error> {
let sdk_config = aws_config::load_from_env().await;
let gsync = GarminSync::new(&sdk_config);

Expand All @@ -233,34 +233,30 @@ impl GarminCli {
"Syncing GPS files",
&self.get_config().gps_dir,
&self.get_config().gps_bucket,
check_md5,
),
(
"Syncing CACHE files",
&self.get_config().cache_dir,
&self.get_config().cache_bucket,
check_md5,
),
(
"Syncing Fitbit Cache",
&self.get_config().fitbit_cachedir,
&self.get_config().fitbit_bucket,
check_md5,
),
(
"Syncing Fitbit Archive",
&self.get_config().fitbit_archivedir,
&self.get_config().fitbit_archive_bucket,
check_md5,
),
];

let futures = options
.into_iter()
.map(|(title, local_dir, s3_bucket, check_md5)| {
debug!("{}", title);
gsync.sync_dir(title, local_dir, s3_bucket, check_md5)
});
let futures = options.into_iter().map(|(title, local_dir, s3_bucket)| {
debug!("{}", title);
let pool = self.pool.clone();
let gsync = gsync.clone();
async move { gsync.sync_dir(title, local_dir, s3_bucket, &pool).await }
});
try_join_all(futures).await
}

Expand Down
15 changes: 6 additions & 9 deletions garmin_cli/src/garmin_cli_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ pub enum GarminCliOpts {
#[clap(short, long)]
end_date: Option<DateType>,
},
Sync {
#[clap(short, long)]
md5sum: bool,
},
Sync,
#[clap(alias = "fit")]
Fitbit {
#[clap(short, long)]
Expand Down Expand Up @@ -160,7 +157,7 @@ impl GarminCliOpts {
.process_opts(&config)
.await?;
Self::Strava.process_opts(&config).await?;
Self::Sync { md5sum: false }.process_opts(&config).await
Self::Sync.process_opts(&config).await
} else {
opts.process_opts(&config).await
}
Expand Down Expand Up @@ -191,7 +188,7 @@ impl GarminCliOpts {
start_date: start_date.map(Into::into),
end_date: end_date.map(Into::into),
},
Self::Sync { md5sum } => GarminCliOptions::Sync(md5sum),
Self::Sync => GarminCliOptions::Sync,
Self::SyncAll => {
return Ok(());
}
Expand Down Expand Up @@ -225,7 +222,7 @@ impl GarminCliOpts {
let filenames = client.sync_tcx(start_date).await?;
if !filenames.is_empty() {
let mut buf = cli.proc_everything().await?;
buf.extend_from_slice(&cli.sync_everything(false).await?);
buf.extend_from_slice(&cli.sync_everything().await?);
}
let filenames = filenames
.into_iter()
Expand Down Expand Up @@ -473,7 +470,7 @@ impl GarminCliOpts {
cli.proc_everything().await
}
Some(GarminCliOptions::Bootstrap) => cli.run_bootstrap().await,
Some(GarminCliOptions::Sync(check_md5)) => cli.sync_everything(*check_md5).await,
Some(GarminCliOptions::Sync) => cli.sync_everything().await,
Some(GarminCliOptions::Connect {
data_directory,
start_date,
Expand All @@ -484,7 +481,7 @@ impl GarminCliOpts {
Self::sync_with_garmin_connect(cli, data_directory, *start_date, *end_date)
.await?;
if !filenames.is_empty() || !input_files.is_empty() || !dates.is_empty() {
buf.extend_from_slice(&cli.sync_everything(false).await?);
buf.extend_from_slice(&cli.sync_everything().await?);
if let Ok(client) = FitbitClient::with_auth(cli.config.clone()).await {
let result = client.sync_everything(&cli.pool).await?;
buf.push(format_sstr!(
Expand Down
2 changes: 1 addition & 1 deletion garmin_connect_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ hyper = "0.14"
derive_more = "0.99"
bytes = "1.0"
smallvec = "1.6"
itertools = "0.11"
itertools = "0.12"
http = "0.2"
stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="0.9.2" }
5 changes: 2 additions & 3 deletions garmin_http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ garmin_lib = {path = "../garmin_lib"}
garmin_cli = {path = "../garmin_cli"}
garmin_reports = {path="../garmin_reports"}
race_result_analysis = {path="../race_result_analysis"}
http = "0.2"
tokio = {version="1.32", features=["full"]}
tokio-stream = "0.1"
time = {version="0.3", features=["serde-human-readable", "macros", "formatting", "parsing"]}
Expand All @@ -29,8 +28,8 @@ thiserror = "1.0"
async-trait = "0.1"
rayon = "1.5"
handlebars = "4.0"
itertools = "0.11"
cookie = {version="0.17", features=["percent-encode"]}
itertools = "0.12"
cookie = {version="0.18", features=["percent-encode"]}
base64 = "0.21"
url = "2.3"
serde_yaml = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion garmin_http/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Error as AnyhowError;
use base64::DecodeError;
use handlebars::RenderError;
use http::StatusCode;
use rweb::http::StatusCode;
use log::error;
use postgres_query::Error as PqError;
use rweb::{
Expand Down
4 changes: 2 additions & 2 deletions garmin_http/src/garmin_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl StravaSyncRequest {

if !filenames.is_empty() {
gcli.process_filenames(&filenames).await?;
gcli.sync_everything(false).await?;
gcli.sync_everything().await?;
gcli.proc_everything().await?;
}
StravaActivity::fix_summary_id_in_db(pool).await?;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl FitbitTcxSyncRequest {
let filenames = client.sync_tcx(start_date).await?;

let gcli = GarminCli::from_pool(pool)?;
gcli.sync_everything(false).await?;
gcli.sync_everything().await?;
gcli.proc_everything().await?;
Ok(filenames)
}
Expand Down
2 changes: 1 addition & 1 deletion garmin_http/src/garmin_rust_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn start_app() -> Result<(), Error> {
{
if !filenames.is_empty() || !input_files.is_empty() || !dates.is_empty() {
info!("processed {filenames:?} and {input_files:?}");
for line in cli.sync_everything(false).await.unwrap_or(Vec::new()) {
for line in cli.sync_everything().await.unwrap_or(Vec::new()) {
info!("{line}");
}
if let Ok(client) = FitbitClient::with_auth(cli.config.clone()).await {
Expand Down
14 changes: 8 additions & 6 deletions garmin_http/src/garmin_rust_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ async fn garmin_upload_body(
let gcli = GarminCli::from_pool(&state.db)?;
let filenames = vec![filename];
let datetimes = gcli.process_filenames(&filenames).await?;
gcli.sync_everything(false).await?;
gcli.sync_everything().await?;
gcli.proc_everything().await?;

let query = FilterRequest {
Expand All @@ -332,11 +332,16 @@ async fn garmin_upload_body(
async fn save_file(file_path: &str, field: Part) -> Result<u64, anyhow::Error> {
let mut file = File::create(file_path).await?;
let mut stream = field.stream();
let mut buf_size = 0usize;

while let Some(chunk) = stream.next().await {
file.write_all(chunk?.chunk()).await?;
let chunk = chunk?;
let chunk = chunk.chunk();
buf_size += chunk.len();
file.write_all(chunk).await?;
}
let file_size = file.metadata().await?.len();
debug_assert!(buf_size as u64 == file_size);
Ok(file_size)
}

Expand All @@ -360,10 +365,7 @@ pub async fn garmin_sync(
#[data] state: AppState,
) -> WarpResult<GarminSyncResponse> {
let gcli = GarminCli::from_pool(&state.db).map_err(Into::<Error>::into)?;
let mut body = gcli
.sync_everything(false)
.await
.map_err(Into::<Error>::into)?;
let mut body = gcli.sync_everything().await.map_err(Into::<Error>::into)?;
body.extend_from_slice(&gcli.proc_everything().await.map_err(Into::<Error>::into)?);
let body = body.join("\n").into();
let body = table_body(body).into();
Expand Down
1 change: 1 addition & 0 deletions garmin_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#![allow(clippy::similar_names)]
#![allow(clippy::unused_async)]
#![allow(clippy::unsafe_derive_deserialize)]
#![allow(clippy::ignored_unit_patterns)]

pub mod errors;
pub mod garmin_elements;
Expand Down
4 changes: 2 additions & 2 deletions garmin_http/src/logged_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ impl Session {
pub fn get_jwt_cookie(&self, domain: &str) -> Cookie<'static> {
let history_str = self.history.join(";");
let token = STANDARD.encode(history_str);
Cookie::build("session", token)
Cookie::build(("session", token))
.http_only(true)
.path("/")
.domain(domain.to_string())
.finish()
.build()
}
}

Expand Down
8 changes: 4 additions & 4 deletions garmin_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2018"
[dependencies]
anyhow = "1.0"
avro-rs = {version = "0.13", features = ["snappy"]}
aws-config = "0.56"
aws-sdk-s3 = "0.31"
aws-config = "0.101"
aws-sdk-s3 = "0.38"
base64 = "0.21"
bytes = "1.0"
deadpool = "0.10"
Expand All @@ -20,7 +20,7 @@ envy = "0.4"
fitparser = {git="https://github.com/ddboline/fitparse-rs.git", branch="time-0.3-0.5.2"}
flate2 = "1.0"
futures = "0.3"
itertools = "0.11"
itertools = "0.12"
json = "0.12"
lazy_static = "1.4"
log = "0.4"
Expand All @@ -31,7 +31,7 @@ postgres-types = {version="0.2", features=["with-time-0_3", "with-uuid-1", "with
rand = "0.8"
rayon = "1.5"
regex = "1.4"
roxmltree = "0.18"
roxmltree = "0.19"
rustls-native-certs = {version="0.6", default_features=false}
serde = {version="1.0", features=["derive"]}
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion garmin_lib/src/common/garmin_correction_lap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl GarminCorrectionLap {
pub async fn fix_corrections_in_db(pool: &PgPool) -> Result<(), Error> {
let query = query!(
"
UPDATE garmin_corrections_laps_backup SET summary_id = (
UPDATE garmin_corrections_laps SET summary_id = (
SELECT id FROM garmin_summary a WHERE a.begin_datetime = start_time
)
WHERE summary_id IS NULL
Expand Down
Loading

0 comments on commit 067d903

Please sign in to comment.