Skip to content

Commit

Permalink
paginate list endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
ddboline committed Oct 19, 2024
1 parent d3993df commit b83597d
Show file tree
Hide file tree
Showing 34 changed files with 609 additions and 781 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "garmin_rust"
version = "0.14.22"
version = "0.14.23"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ all:
cp Dockerfile.build.ubuntu20.04 build/Dockerfile && \
cp -a Cargo.toml src scripts Makefile templates garmin_cli \
garmin_lib garmin_http fitbit_lib fitbit_bot strava_lib \
garmin_connect_lib race_result_analysis garmin_reports build/ && \
race_result_analysis garmin_reports build/ && \
cd build/ && \
docker build -t garmin_rust/build_rust:ubuntu20.04 . && \
cd ../ && \
Expand Down
2 changes: 1 addition & 1 deletion fitbit_bot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fitbit_bot"
version = "0.14.22"
version = "0.14.23"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion fitbit_lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fitbit_lib"
version = "0.14.22"
version = "0.14.23"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand Down
32 changes: 17 additions & 15 deletions fitbit_lib/src/fitbit_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ impl FitbitClient {
/// Returns error if api call fails
pub async fn remove_duplicate_entries(&self, pool: &PgPool) -> Result<Vec<StackString>, Error> {
let mut last_entry = None;
let futures = FitbitActivity::read_from_db(pool, None, None)
let futures = FitbitActivity::read_from_db(pool, None, None, None, None)
.await?
.into_iter()
.map(|activity| {
Expand Down Expand Up @@ -876,7 +876,7 @@ impl FitbitClient {

// Get existing activities
let mut existing_activities: HashMap<_, _> =
FitbitActivity::read_from_db(pool, Some(date), None)
FitbitActivity::read_from_db(pool, Some(date), None, None, None)
.await?
.into_iter()
.map(|activity| (activity.log_id, activity))
Expand Down Expand Up @@ -984,14 +984,15 @@ impl FitbitClient {
.collect();
existing_map.shrink_to_fit();

let mut measurements: Vec<_> = ScaleMeasurement::read_from_db(pool, Some(start_date), None)
.await?
.into_iter()
.filter(|entry| {
let date = entry.datetime.to_timezone(local).date();
!existing_map.contains_key(&date)
})
.collect();
let mut measurements: Vec<_> =
ScaleMeasurement::read_from_db(pool, Some(start_date), None, None, None)
.await?
.into_iter()
.filter(|entry| {
let date = entry.datetime.to_timezone(local).date();
!existing_map.contains_key(&date)
})
.collect();
measurements.shrink_to_fit();
self.update_fitbit_bodyweightfat(&measurements).await?;

Expand Down Expand Up @@ -1242,11 +1243,12 @@ mod tests {
let config = GarminConfig::get_config(None)?;
let client = FitbitClient::with_auth(config.clone()).await?;
let pool = PgPool::new(&config.pgurl)?;
let mut activities: HashMap<_, _> = FitbitActivity::read_from_db(&pool, None, None)
.await?
.into_iter()
.map(|activity| (activity.log_id, activity))
.collect();
let mut activities: HashMap<_, _> =
FitbitActivity::read_from_db(&pool, None, None, None, None)
.await?
.into_iter()
.map(|activity| (activity.log_id, activity))
.collect();
activities.shrink_to_fit();

let offset = client.get_offset();
Expand Down
92 changes: 71 additions & 21 deletions fitbit_lib/src/fitbit_statistics_summary.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::Error;
use futures::Stream;
use postgres_query::{query, Error as PqError, FromSqlRow};
use postgres_query::{query, query_dyn, Error as PqError, FromSqlRow, Parameter, Query};
use serde::{Deserialize, Serialize};
use stack_string::format_sstr;
use statistical::{mean, median, standard_deviation};
use time::{Date, Duration, OffsetDateTime};
use std::convert::TryInto;
use time::Date;
use time_tz::OffsetDateTimeExt;

use garmin_lib::date_time_wrapper::DateTimeWrapper;
Expand Down Expand Up @@ -65,35 +67,83 @@ impl FitbitStatisticsSummary {
query.fetch_opt(&conn).await.map_err(Into::into)
}

fn get_fitbit_statistics_query<'a>(
select_str: &'a str,
start_date: &'a Option<Date>,
end_date: &'a Option<Date>,
offset: Option<usize>,
limit: Option<usize>,
order_str: &'a str,
) -> Result<Query<'a>, PqError> {
let mut conditions = Vec::new();
let mut query_bindings = Vec::new();
if let Some(start_date) = start_date {
conditions.push("date >= $start_date");
query_bindings.push(("start_date", start_date as Parameter));
}
if let Some(end_date) = end_date {
conditions.push("date <= $end_date");
query_bindings.push(("end_date", end_date as Parameter));
}

let mut query = format_sstr!(
"SELECT {select_str} FROM heartrate_statistics_summary {} {order_str}",
if conditions.is_empty() {
"".into()
} else {
format_sstr!("WHERE {}", conditions.join(" AND "))
}
);
if let Some(offset) = &offset {
query.push_str(&format_sstr!(" OFFSET {offset}"));
}
if let Some(limit) = &limit {
query.push_str(&format_sstr!(" LIMIT {limit}"));
}
query_dyn!(&query, ..query_bindings)
}

/// # Errors
/// Returns error if db query fails
pub async fn read_from_db(
pool: &PgPool,
start_date: Option<Date>,
end_date: Option<Date>,
pool: &PgPool,
offset: Option<usize>,
limit: Option<usize>,
) -> Result<impl Stream<Item = Result<Self, PqError>>, Error> {
let local = DateTimeWrapper::local_tz();
let start_date = start_date.unwrap_or_else(|| {
(OffsetDateTime::now_utc() - Duration::days(365))
.to_timezone(local)
.date()
});
let end_date =
end_date.unwrap_or_else(|| OffsetDateTime::now_utc().to_timezone(local).date());

let query = query!(
r#"
SELECT * FROM heartrate_statistics_summary
WHERE date >= $start_date AND date <= $end_date
ORDER BY date
"#,
start_date = start_date,
end_date = end_date
);
let query = Self::get_fitbit_statistics_query(
"*",
&start_date,
&end_date,
offset,
limit,
"ORDER BY date",
)?;
let conn = pool.get().await?;
query.fetch_streaming(&conn).await.map_err(Into::into)
}

/// # Errors
/// Returns error if db query fails
pub async fn get_total(
pool: &PgPool,
start_date: Option<Date>,
end_date: Option<Date>,
) -> Result<usize, Error> {
#[derive(FromSqlRow)]
struct Count {
count: i64,
}

let query =
Self::get_fitbit_statistics_query("count(*)", &start_date, &end_date, None, None, "")?;
let conn = pool.get().await?;
let count: Count = query.fetch_one(&conn).await?;

Ok(count.count.try_into()?)
}

/// # Errors
/// Returns error if db query fails
pub async fn upsert_entry(&self, pool: &PgPool) -> Result<(), Error> {
Expand Down
85 changes: 64 additions & 21 deletions fitbit_lib/src/scale_measurement.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::{format_err, Error};
use futures::{stream::FuturesUnordered, TryStreamExt};
use log::debug;
use postgres_query::{query, query_dyn, FromSqlRow, Parameter};
use postgres_query::{query, query_dyn, Error as PqError, FromSqlRow, Parameter, Query};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use stack_string::format_sstr;
use std::{collections::HashSet, fmt, sync::Arc};
use std::{collections::HashSet, convert::TryInto, fmt, sync::Arc};
use time::{Date, OffsetDateTime};
use uuid::Uuid;

Expand Down Expand Up @@ -245,48 +245,91 @@ impl ScaleMeasurement {
Ok(result)
}

/// # Errors
/// Returns error if db query fails
pub async fn read_from_db(
pool: &PgPool,
start_date: Option<Date>,
end_date: Option<Date>,
) -> Result<Vec<Self>, Error> {
let query = "SELECT * FROM scale_measurements";
fn get_scale_measurement_query<'a>(
select_str: &'a str,
start_date: &'a Option<Date>,
end_date: &'a Option<Date>,
offset: Option<usize>,
limit: Option<usize>,
order_str: &'a str,
) -> Result<Query<'a>, PqError> {
let mut conditions = Vec::new();
let mut bindings = Vec::new();
let mut query_bindings = Vec::new();
if let Some(d) = start_date {
conditions.push("date(datetime) >= $start_date");
bindings.push(("start_date", d));
query_bindings.push(("start_date", d as Parameter));
}
if let Some(d) = end_date {
conditions.push("date(datetime) <= $end_date");
bindings.push(("end_date", d));
query_bindings.push(("end_date", d as Parameter));
}
let query = format_sstr!(
"{query} {c} ORDER BY datetime",
c = if conditions.is_empty() {
let mut query = format_sstr!(
"SELECT {select_str} FROM scale_measurements {} {order_str}",
if conditions.is_empty() {
"".into()
} else {
format_sstr!("WHERE {}", conditions.join(" AND "))
}
);
let mut query_bindings: Vec<_> =
bindings.iter().map(|(k, v)| (*k, v as Parameter)).collect();
if let Some(offset) = offset {
query.push_str(&format_sstr!(" OFFSET {offset}"));
}
if let Some(limit) = limit {
query.push_str(&format_sstr!(" LIMIT {limit}"));
}
query_bindings.shrink_to_fit();
debug!("query:\n{}", query);
let query = query_dyn!(&query, ..query_bindings)?;
query_dyn!(&query, ..query_bindings)
}

/// # Errors
/// Returns error if db query fails
pub async fn read_from_db(
pool: &PgPool,
start_date: Option<Date>,
end_date: Option<Date>,
offset: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<Self>, Error> {
let query = Self::get_scale_measurement_query(
"*",
&start_date,
&end_date,
offset,
limit,
"ORDER BY datetime",
)?;
let conn = pool.get().await?;
query.fetch(&conn).await.map_err(Into::into)
}

/// # Errors
/// Return error if db query fails
pub async fn get_total(
pool: &PgPool,
start_date: Option<Date>,
end_date: Option<Date>,
) -> Result<usize, Error> {
#[derive(FromSqlRow)]
struct Count {
count: i64,
}

let query =
Self::get_scale_measurement_query("count(*)", &start_date, &end_date, None, None, "")?;
let conn = pool.get().await?;
let count: Count = query.fetch_one(&conn).await?;

Ok(count.count.try_into()?)
}

/// # Errors
/// Returns error if db query fails
pub async fn merge_updates<'a, T>(measurements: T, pool: &PgPool) -> Result<(), Error>
where
T: IntoIterator<Item = &'a mut Self>,
{
let mut measurement_set: HashSet<_> = ScaleMeasurement::read_from_db(pool, None, None)
let mut measurement_set: HashSet<_> = Self::read_from_db(pool, None, None, None, None)
.await?
.into_par_iter()
.map(|d| d.datetime)
Expand Down Expand Up @@ -380,7 +423,7 @@ mod tests {

assert_eq!(exp, obs);

let measurements = ScaleMeasurement::read_from_db(&pool, None, None).await?;
let measurements = ScaleMeasurement::read_from_db(&pool, None, None, None, None).await?;
assert!(measurements.len() > 0);
let first = measurements[0];
debug!("{:#?}", first);
Expand Down
2 changes: 1 addition & 1 deletion garmin_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "garmin_cli"
version = "0.14.22"
version = "0.14.23"
authors = ["Daniel Boline <[email protected]>"]
edition = "2018"

Expand Down
Loading

0 comments on commit b83597d

Please sign in to comment.