From 4a4f0f2835ff3b085e15017767c75e363aa367cd Mon Sep 17 00:00:00 2001 From: Daniel Boline Date: Sat, 23 Nov 2024 01:15:15 -0500 Subject: [PATCH] refactoring --- Cargo.toml | 8 +- .../V34__authorized_users_created_deleted.sql | 2 + movie_collection_http/Cargo.toml | 10 +- movie_collection_http/src/lib.rs | 2 +- movie_collection_http/src/logged_user.rs | 66 ++++++++--- .../src/movie_queue_routes.rs | 112 +++++------------- movie_collection_lib/Cargo.toml | 8 +- movie_collection_lib/src/mkv_utils.rs | 3 +- movie_collection_lib/src/movie_collection.rs | 42 +++---- movie_collection_lib/src/movie_queue.rs | 5 +- movie_collection_lib/src/trakt_utils.rs | 6 +- movie_collection_lib/src/transcode_service.rs | 49 ++++---- movie_collection_lib/src/utils.rs | 63 ++++++---- src/movie_queue_cli.rs | 27 +++-- transcode_lib/Cargo.toml | 4 +- 15 files changed, 211 insertions(+), 196 deletions(-) create mode 100644 migrations/V34__authorized_users_created_deleted.sql diff --git a/Cargo.toml b/Cargo.toml index 4f4bc72..8bb8c13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "movie_collection_rust" -version = "0.10.52" +version = "0.11.0" authors = ["Daniel Boline "] edition = "2018" @@ -14,7 +14,7 @@ Utilities for maintaining a collection of videos.""" [dependencies] anyhow = "1.0" -clap = {version="4.0", features=["derive"]} +clap = {version="4.5", features=["derive"]} derive_more = {version="1.0", features=["full"]} env_logger = "0.11" futures = "0.3" @@ -23,11 +23,11 @@ movie_collection_lib = {path = "movie_collection_lib"} movie_collection_http = {path = "movie_collection_http"} refinery = {version="0.8", features=["tokio-postgres"]} serde_json = "1.0" -stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="0.9.3" } +stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="1.0.2" } stdout-channel = "0.6" time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]} time-tz = {version="2.0", features=["system"]} -tokio = {version="1.40", features=["rt", "macros", "rt-multi-thread"]} +tokio = {version="1.41", features=["rt", "macros", "rt-multi-thread"]} transcode_lib = {path = "transcode_lib"} [workspace] diff --git a/migrations/V34__authorized_users_created_deleted.sql b/migrations/V34__authorized_users_created_deleted.sql new file mode 100644 index 0000000..8c461f1 --- /dev/null +++ b/migrations/V34__authorized_users_created_deleted.sql @@ -0,0 +1,2 @@ +ALTER TABLE authorized_users ADD COLUMN created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(); +ALTER TABLE authorized_users ADD COLUMN deleted_at TIMESTAMP WITH TIME ZONE; diff --git a/movie_collection_http/Cargo.toml b/movie_collection_http/Cargo.toml index ae3dd15..535f90d 100644 --- a/movie_collection_http/Cargo.toml +++ b/movie_collection_http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "movie_collection_http" -version = "0.10.52" +version = "0.11.0" authors = ["Daniel Boline "] edition = "2018" @@ -9,7 +9,7 @@ anyhow = "1.0" async-graphql = {version="7.0", features=["dataloader", "time"]} async-graphql-warp = "7.0" async-trait = "0.1" -authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.11.15"} +authorized_users = { git = "https://github.com/ddboline/auth_server_rust.git", tag="0.12.0"} bytes = "1.0" derive_more = {version="1.0", features=["full"]} dioxus = "0.5" @@ -33,12 +33,12 @@ rweb = {git = "https://github.com/ddboline/rweb.git", features=["openapi"], defa rweb-helper = { git = "https://github.com/ddboline/rweb_helper.git", tag="0.5.3" } serde_yaml = "0.9" smallvec = {version="1.6", features=["serde", "write"]} -stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi", "async_graphql"], tag="0.9.3" } +stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types", "rweb-openapi", "async_graphql"], tag="1.0.2" } stdout-channel = "0.6" -thiserror = "1.0" +thiserror = "2.0" time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]} time-tz = {version="2.0", features=["system"]} -tokio = {version="1.40", features=["rt", "macros", "rt-multi-thread"]} +tokio = {version="1.41", features=["rt", "macros", "rt-multi-thread"]} tokio-stream = "0.1" uuid = "1.0" url = "2.3" diff --git a/movie_collection_http/src/lib.rs b/movie_collection_http/src/lib.rs index 8f93d7d..c1efc90 100644 --- a/movie_collection_http/src/lib.rs +++ b/movie_collection_http/src/lib.rs @@ -171,7 +171,7 @@ derive_rweb_schema!(PlexEventWrapper, _PlexEventWrapper); #[schema(component = "PlexEvent")] struct _PlexEventWrapper { #[schema(description = "ID")] - id: StackString, + id: UuidWrapper, #[schema(description = "Event")] event: StackString, #[schema(description = "Account")] diff --git a/movie_collection_http/src/logged_user.rs b/movie_collection_http/src/logged_user.rs index 07d8359..1b220ed 100644 --- a/movie_collection_http/src/logged_user.rs +++ b/movie_collection_http/src/logged_user.rs @@ -1,24 +1,27 @@ pub use authorized_users::{ - get_random_key, get_secrets, token::Token, AuthorizedUser, AUTHORIZED_USERS, JWT_SECRET, - KEY_LENGTH, LOGIN_HTML, SECRET_KEY, TRIGGER_DB_UPDATE, + get_random_key, get_secrets, token::Token, AuthorizedUser as ExternalUser, AUTHORIZED_USERS, + JWT_SECRET, KEY_LENGTH, LOGIN_HTML, SECRET_KEY, TRIGGER_DB_UPDATE, }; +use futures::TryStreamExt; use log::debug; -use maplit::hashset; +use maplit::hashmap; use reqwest::Client; use rweb::{filters::cookie::cookie, Filter, Rejection, Schema}; use rweb_helper::UuidWrapper; use serde::{Deserialize, Serialize}; use stack_string::{format_sstr, StackString}; use std::{ + collections::HashMap, convert::{TryFrom, TryInto}, env::var, str::FromStr, }; +use time::OffsetDateTime; use tokio::task::{spawn, JoinHandle}; use url::Url; use uuid::Uuid; -use movie_collection_lib::{config::Config, pgpool::PgPool, utils::get_authorized_users}; +use movie_collection_lib::{config::Config, pgpool::PgPool, utils::AuthorizedUsers}; use crate::errors::ServiceError as Error; @@ -68,7 +71,7 @@ impl LoggedUser { config: &Config, ) -> Result, anyhow::Error> { let base_url: Url = format_sstr!("https://{}", config.domain).parse()?; - let session: Option = AuthorizedUser::get_session_data( + let session: Option = ExternalUser::get_session_data( &base_url, self.session.into(), &self.secret_key, @@ -91,7 +94,7 @@ impl LoggedUser { let session = SessionData { movie_last_url: set_url.into(), }; - AuthorizedUser::set_session_data( + ExternalUser::set_session_data( &base_url, self.session.into(), &self.secret_key, @@ -118,8 +121,8 @@ impl LoggedUser { } } -impl From for LoggedUser { - fn from(user: AuthorizedUser) -> Self { +impl From for LoggedUser { + fn from(user: ExternalUser) -> Self { Self { email: user.email, session: user.session.into(), @@ -154,17 +157,46 @@ impl FromStr for LoggedUser { /// # Errors /// Returns error if `get_authorized_users` fails pub async fn fill_from_db(pool: &PgPool) -> Result<(), Error> { - debug!("{:?}", *TRIGGER_DB_UPDATE); - let users = if TRIGGER_DB_UPDATE.check() { - get_authorized_users(pool).await? - } else { - AUTHORIZED_USERS.get_users() - }; if let Ok("true") = var("TESTENV").as_ref().map(String::as_str) { - AUTHORIZED_USERS.update_users(hashset! {"user@test".into()}); + AUTHORIZED_USERS.update_users(hashmap! { + "user@test".into() => ExternalUser { + email: "user@test".into(), + session: Uuid::new_v4(), + secret_key: StackString::default(), + created_at: Some(OffsetDateTime::now_utc()) + } + }); + return Ok(()); + } + let (created_at, deleted_at) = AuthorizedUsers::get_most_recent(pool).await?; + let most_recent_user_db = created_at.max(deleted_at); + let existing_users = AUTHORIZED_USERS.get_users(); + let most_recent_user = existing_users.values().map(|i| i.created_at).max(); + debug!("most_recent_user_db {most_recent_user_db:?} most_recent_user {most_recent_user:?}"); + if most_recent_user_db.is_some() + && most_recent_user.is_some() + && most_recent_user_db <= most_recent_user + { + return Ok(()); } - AUTHORIZED_USERS.update_users(users); - debug!("{:?}", *AUTHORIZED_USERS); + let result: Result, _> = AuthorizedUsers::get_authorized_users(pool) + .await? + .map_ok(|u| { + ( + u.email.clone(), + ExternalUser { + email: u.email, + session: Uuid::new_v4(), + secret_key: StackString::default(), + created_at: Some(u.created_at), + }, + ) + }) + .try_collect() + .await; + let users = result?; + AUTHORIZED_USERS.update_users(users); + debug!("AUTHORIZED_USERS {:?}", *AUTHORIZED_USERS); Ok(()) } diff --git a/movie_collection_http/src/movie_queue_routes.rs b/movie_collection_http/src/movie_queue_routes.rs index 8a09ca7..ae0f459 100644 --- a/movie_collection_http/src/movie_queue_routes.rs +++ b/movie_collection_http/src/movie_queue_routes.rs @@ -142,12 +142,9 @@ pub async fn movie_queue_show( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/queue/{path}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/queue/{path}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let patterns = vec![path]; @@ -183,12 +180,9 @@ pub async fn movie_queue_delete( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/delete/{path}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/delete/{path}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -257,12 +251,9 @@ pub async fn movie_queue_transcode( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/queue/{path}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/queue/{path}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let patterns = vec![path]; @@ -283,12 +274,9 @@ pub async fn movie_queue_transcode_directory( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/queue/{directory}/{file}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/queue/{directory}/{file}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let patterns = vec![file]; @@ -322,12 +310,9 @@ pub async fn movie_queue_play( .get_url(state.trakt.get_client(), &state.config) .await .map_err(Into::::into)?; + let url = format_sstr!("/list/play/{idx}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/play/{idx}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); @@ -358,12 +343,9 @@ pub async fn imdb_show( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/imdb/{show}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/imdb/{show}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let query = query.into_inner(); let req = ImdbShowRequest { show, query }; @@ -986,12 +968,9 @@ pub async fn movie_queue_transcode_file( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/file/{filename}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/file/{filename}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1028,12 +1007,9 @@ pub async fn movie_queue_remcom_file( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/remcom/file/{filename}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/remcom/file/{filename}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1078,12 +1054,9 @@ pub async fn movie_queue_remcom_directory_file( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/remcom/directory/{directory}/{filename}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/remcom/directory/{directory}/{filename}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1130,12 +1103,9 @@ pub async fn movie_queue_transcode_cleanup( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/cleanup/{path}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/cleanup/{path}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let movie_path = state .config @@ -1240,12 +1210,9 @@ pub async fn trakt_watchlist_action( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/trakt/watchlist/{action}/{show}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/trakt/watchlist/{action}/{show}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1265,12 +1232,9 @@ pub async fn trakt_watched_seasons( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/trakt/watched/list/{imdb_url}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/trakt/watched/list/{imdb_url}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let show_opt = ImdbRatings::get_show_by_link(&imdb_url, &state.db) .await @@ -1300,12 +1264,9 @@ pub async fn trakt_watched_list( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/trakt/watched/list/{imdb_url}/{season}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/trakt/watched/list/{imdb_url}/{season}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1335,12 +1296,9 @@ pub async fn trakt_watched_action( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/trakt/watched/{action}/{imdb_url}/{season}/{episode}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/trakt/watched/{action}/{imdb_url}/{season}/{episode}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let mock_stdout = MockStdout::new(); let stdout = StdoutChannel::with_mock_stdout(mock_stdout.clone(), mock_stdout.clone()); @@ -1726,12 +1684,9 @@ pub async fn plex_detail( query: Query, ) -> WarpResult { let query = query.into_inner(); + let url = format_sstr!("/list/plex/{id}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/plex/{id}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let body = if let Some(event) = PlexEvent::get_event_by_id(&state.db, id.into()) .await @@ -2030,12 +1985,9 @@ pub async fn movie_queue_extract_subtitle( #[filter = "LoggedUser::filter"] user: LoggedUser, #[data] state: AppState, ) -> WarpResult { + let url = format_sstr!("/list/transcode/subtitle/{filename}/{index}"); let task = user - .store_url_task( - state.trakt.get_client(), - &state.config, - &format_sstr!("/list/transcode/subtitle/{filename}/{index}"), - ) + .store_url_task(state.trakt.get_client(), &state.config, &url) .await; let input_path = state diff --git a/movie_collection_lib/Cargo.toml b/movie_collection_lib/Cargo.toml index 77cc727..e7489be 100644 --- a/movie_collection_lib/Cargo.toml +++ b/movie_collection_lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "movie_collection_lib" -version = "0.10.52" +version = "0.11.0" authors = ["Daniel Boline "] edition = "2018" @@ -34,12 +34,12 @@ select = "0.6" serde = {version="1.0", features=["derive"]} serde_json = "1.0" smallvec = {version="1.6", features=["serde", "write"]} -stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="0.9.3" } +stack-string = { git = "https://github.com/ddboline/stack-string-rs.git", features=["postgres_types"], tag="1.0.2" } stdout-channel = "0.6" -thiserror = "1.0" +thiserror = "2.0" time = {version="0.3", features=["serde-human-readable", "macros", "formatting"]} time-tz = {version="2.0", features=["system"]} tokio-postgres = {version="0.7", features=["with-uuid-1", "with-time-0_3", "with-serde_json-1"]} -tokio = {version="1.40", features=["rt", "macros", "rt-multi-thread", "process"]} +tokio = {version="1.41", features=["rt", "macros", "rt-multi-thread", "process"]} tokio-stream = {version="0.1", features=["fs"]} uuid = "1.0" diff --git a/movie_collection_lib/src/mkv_utils.rs b/movie_collection_lib/src/mkv_utils.rs index e775013..a64f6aa 100644 --- a/movie_collection_lib/src/mkv_utils.rs +++ b/movie_collection_lib/src/mkv_utils.rs @@ -139,8 +139,9 @@ impl MkvTrack { if index < 1 { return Err(format_err!("Index must be greater than 0")); } + let srt_path = format_sstr!("{}:{srt_path}", index - 1); let output = Command::new("/usr/bin/mkvextract") - .args([fpath, "tracks", &format_sstr!("{}:{srt_path}", index - 1)]) + .args([fpath, "tracks", &srt_path]) .output() .await?; if !output.status.success() && output.status.code() != Some(1) { diff --git a/movie_collection_lib/src/movie_collection.rs b/movie_collection_lib/src/movie_collection.rs index 02a83f7..9c94824 100644 --- a/movie_collection_lib/src/movie_collection.rs +++ b/movie_collection_lib/src/movie_collection.rs @@ -203,19 +203,18 @@ impl MovieCollection { rating: f64, } - let query = query_dyn!( - &format_sstr!( - r#" - SELECT index, show, title, link, rating - FROM imdb_ratings - WHERE link is not null AND - rating is not null AND - show = $show {} - "#, - if istv { "AND istv" } else { "" }, - ), - show = show - )?; + let query = format_sstr!( + r#" + SELECT index, show, title, link, rating + FROM imdb_ratings + WHERE link is not null AND + rating is not null AND + show = $show {} + "#, + if istv { "AND istv" } else { "" }, + ); + + let query = query_dyn!(&query, show = show)?; let conn = self.pool.get().await?; let results: Vec = query.fetch(&conn).await?; let results: Vec<_> = results @@ -284,7 +283,7 @@ impl MovieCollection { write!(search_constr, "AND ({search_strs})")?; } - let query = query_dyn!(&format_sstr!( + let query = format_sstr!( r#" SELECT a.path, a.show, COALESCE(b.rating, -1) as rating, @@ -294,7 +293,9 @@ impl MovieCollection { LEFT JOIN imdb_ratings b ON a.show_id = b.index WHERE a.is_deleted = false {search_constr} "# - ),)?; + ); + + let query = query_dyn!(&query,)?; let conn = self.pool.get().await?; let results: Vec = query.fetch(&conn).await?; @@ -1029,9 +1030,8 @@ impl MovieCollection { } else { format_sstr!("WHERE {constr}") }; - let query = query_dyn!(&format_sstr!( - "SELECT path FROM movie_collection {where_str}" - ))?; + let query = format_sstr!("SELECT path FROM movie_collection {where_str}"); + let query = query_dyn!(&query)?; let conn = self.pool.get().await?; query .query_streaming(&conn) @@ -1072,13 +1072,15 @@ impl LastModifiedResponse { struct Count { count: i64, } - let query = query_dyn!(&format_sstr!("SELECT count(*) AS count FROM {table}"))?; + let query = format_sstr!("SELECT count(*) AS count FROM {table}"); + let query = query_dyn!(&query)?; let conn = pool.get().await?; let count: Count = query.fetch_one(&conn).await?; if count.count == 0 { return Ok(None); } - let query = query_dyn!(&format_sstr!("SELECT max(last_modified) FROM {table}"))?; + let query = format_sstr!("SELECT max(last_modified) FROM {table}"); + let query = query_dyn!(&query)?; if let Some((last_modified,)) = query.fetch_opt(&conn).await? { let last_modified: OffsetDateTime = last_modified; Ok(Some(LastModifiedResponse { diff --git a/movie_collection_lib/src/movie_queue.rs b/movie_collection_lib/src/movie_queue.rs index 262e6d0..e0fafcc 100644 --- a/movie_collection_lib/src/movie_queue.rs +++ b/movie_collection_lib/src/movie_queue.rs @@ -258,7 +258,7 @@ impl MovieQueueDB { write!(limit_str, "LIMIT {limit}")?; } let order_by = order_by.unwrap_or(OrderBy::Desc); - let query = query_dyn!(&format_sstr!( + let query = &format_sstr!( r#" SELECT a.idx, b.path, c.link, c.istv FROM movie_queue a @@ -269,7 +269,8 @@ impl MovieQueueDB { {offset_str} {limit_str} "# - ),)?; + ); + let query = query_dyn!(query,)?; let conn = self.pool.get().await?; let results: Vec = query.fetch(&conn).await?; diff --git a/movie_collection_lib/src/trakt_utils.rs b/movie_collection_lib/src/trakt_utils.rs index b669635..afc06c3 100644 --- a/movie_collection_lib/src/trakt_utils.rs +++ b/movie_collection_lib/src/trakt_utils.rs @@ -478,8 +478,7 @@ pub async fn get_watched_shows_db( if !where_vec.is_empty() { write!(where_str, "WHERE {}", where_vec.join(" AND "))?; } - - let query = query_dyn!(&format_sstr!( + let query = format_sstr!( r#" SELECT a.link as imdb_url, c.show, @@ -492,7 +491,8 @@ pub async fn get_watched_shows_db( {where_str} ORDER BY 3,4,5 "# - ))?; + ); + let query = query_dyn!(&query)?; let conn = pool.get().await?; query.fetch_streaming(&conn).await.map_err(Into::into) } diff --git a/movie_collection_lib/src/transcode_service.rs b/movie_collection_lib/src/transcode_service.rs index 21c2439..742dcd8 100644 --- a/movie_collection_lib/src/transcode_service.rs +++ b/movie_collection_lib/src/transcode_service.rs @@ -183,7 +183,7 @@ impl TranscodeServiceRequest { }; let input_path = path.to_path_buf(); - let output_path = output_dir.join(&format_sstr!("{prefix}.mp4")); + let output_path = output_dir.join(format_sstr!("{prefix}.mp4")); Ok(Self { job_type: JobType::Move, @@ -230,7 +230,7 @@ impl TranscodeServiceRequest { match self.job_type { JobType::Transcode => job_dir(config).join(prefix).with_extension("json"), JobType::Move => job_dir(config) - .join(&format_sstr!("{prefix}_copy")) + .join(format_sstr!("{prefix}_copy")) .with_extension("json"), } } @@ -356,7 +356,7 @@ impl TranscodeService { .join("Documents") .join("movies") .join(output_path); - let debug_output_path = log_dir(&self.config).join(&format_sstr!("{prefix}_mp4")); + let debug_output_path = log_dir(&self.config).join(format_sstr!("{prefix}_mp4")); let stdout_path = debug_output_path.with_extension("out"); let stderr_path = debug_output_path.with_extension("err"); @@ -406,7 +406,7 @@ impl TranscodeService { f.write_all(&stdout).await?; } } - let new_debug_output_path = tmp_avi_path.join(&format_sstr!("{prefix}_mp4.out")); + let new_debug_output_path = tmp_avi_path.join(format_sstr!("{prefix}_mp4.out")); fs::rename(&stderr_path, &new_debug_output_path).await?; fs::remove_file(&stdout_path).await?; } @@ -420,7 +420,7 @@ impl TranscodeService { output_file: &Path, ) -> Result<(), Error> { let script_file = job_dir(&self.config) - .join(&format_sstr!("{show}_copy")) + .join(format_sstr!("{show}_copy")) .with_extension("json"); if script_file.exists() { fs::remove_file(&script_file).await?; @@ -439,7 +439,7 @@ impl TranscodeService { return Err(format_err!("{input_file:?} does not exist")); } - let debug_output_path = log_dir(&self.config).join(&format_sstr!("{show}_copy.out")); + let debug_output_path = log_dir(&self.config).join(format_sstr!("{show}_copy.out")); let mut debug_output_file = File::create(&debug_output_path).await?; let show_path = self @@ -447,7 +447,7 @@ impl TranscodeService { .home_dir .join("Documents") .join("movies") - .join(&format_sstr!("{show}.mp4")); + .join(format_sstr!("{show}.mp4")); if !show_path.exists() { return Ok(()); } @@ -519,7 +519,7 @@ impl TranscodeService { .home_dir .join("Documents") .join("movies") - .join(&format_sstr!("{show}.srt")); + .join(format_sstr!("{show}.srt")); if srt_path.exists() { let new_path = output_file.with_extension("srt"); let mut buf = StackString::new(); @@ -537,8 +537,7 @@ impl TranscodeService { debug_output_file.flush().await?; if debug_output_path.exists() { - let new_debug_output_path = - tmp_dir(&self.config).join(&format_sstr!("{show}_copy.out")); + let new_debug_output_path = tmp_dir(&self.config).join(format_sstr!("{show}_copy.out")); fs::rename(&debug_output_path, &new_debug_output_path).await?; } @@ -711,25 +710,21 @@ impl TranscodeStatus { impl fmt::Display for TranscodeStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if !self.procs.is_empty() { - write!( - f, - "Running procs:\n\n{}\n\n", - self.procs - .iter() - .sorted_by_key(|p| p.pid) - .map(|p| format_sstr!("{p}")) - .join("\n") - )?; + let s = self + .procs + .iter() + .sorted_by_key(|p| p.pid) + .map(|p| format_sstr!("{p}")) + .join("\n"); + write!(f, "Running procs:\n\n{s}\n\n",)?; } if !self.upcoming_jobs.is_empty() { - write!( - f, - "Upcoming jobs:\n\n{}\n\n", - self.upcoming_jobs - .iter() - .map(StackString::from_display) - .join("\n") - )?; + let s = self + .upcoming_jobs + .iter() + .map(StackString::from_display) + .join("\n"); + write!(f, "Upcoming jobs:\n\n{s}\n\n")?; } if !self.current_jobs.is_empty() { write!( diff --git a/movie_collection_lib/src/utils.rs b/movie_collection_lib/src/utils.rs index af0d02a..83f0bcd 100644 --- a/movie_collection_lib/src/utils.rs +++ b/movie_collection_lib/src/utils.rs @@ -1,9 +1,9 @@ use anyhow::{format_err, Error}; use async_trait::async_trait; -use futures::TryStreamExt; +use futures::Stream; use jwalk::WalkDir; use log::error; -use postgres_query::{query, Error as PqError}; +use postgres_query::{query, Error as PqError, FromSqlRow}; use rand::{ distributions::{Distribution, Uniform}, thread_rng, @@ -12,10 +12,8 @@ use reqwest::{Client, Response, Url}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use stack_string::{format_sstr, StackString}; -use std::{ - collections::HashSet, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; +use time::OffsetDateTime; use tokio::{ process::Command, time::{sleep, Duration}, @@ -161,19 +159,42 @@ pub trait ExponentialRetry { } } -/// # Errors -/// Return error if db query fails -pub async fn get_authorized_users(pool: &PgPool) -> Result, Error> { - let query = query!("SELECT email FROM authorized_users"); - let conn = pool.get().await?; - query - .query_streaming(&conn) - .await? - .and_then(|row| async move { - let email: StackString = row.try_get(0).map_err(PqError::BeginTransaction)?; - Ok(email) - }) - .try_collect() - .await - .map_err(Into::into) +#[derive(FromSqlRow, Clone, Debug)] +pub struct AuthorizedUsers { + pub email: StackString, + pub created_at: OffsetDateTime, +} + +impl AuthorizedUsers { + /// # Errors + /// Return error if db query fails + pub async fn get_authorized_users( + pool: &PgPool, + ) -> Result>, Error> { + let query = query!("SELECT * FROM authorized_users WHERE deleted_at IS NULL"); + let conn = pool.get().await?; + query.fetch_streaming(&conn).await.map_err(Into::into) + } + + /// # Errors + /// Returns error if db query fails + pub async fn get_most_recent( + pool: &PgPool, + ) -> Result<(Option, Option), Error> { + #[derive(FromSqlRow)] + struct CreatedDeleted { + created_at: Option, + deleted_at: Option, + } + + let query = query!( + "SELECT max(created_at) as created_at, max(deleted_at) as deleted_at FROM users" + ); + let conn = pool.get().await?; + let result: Option = query.fetch_opt(&conn).await?; + match result { + Some(result) => Ok((result.created_at, result.deleted_at)), + None => Ok((None, None)), + } + } } diff --git a/src/movie_queue_cli.rs b/src/movie_queue_cli.rs index fe84760..feb105b 100644 --- a/src/movie_queue_cli.rs +++ b/src/movie_queue_cli.rs @@ -245,7 +245,8 @@ impl MovieQueueCli { match table.as_str() { "last_modified" => { let last_modified = LastModifiedResponse::get_last_modified(&pool).await?; - file.write_all(&serde_json::to_vec(&last_modified)?).await?; + let v = serde_json::to_vec(&last_modified)?; + file.write_all(&v).await?; } "imdb_ratings" => { let shows: Vec<_> = ImdbRatings::get_shows_after_timestamp( @@ -257,7 +258,8 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&shows)?).await?; + let v = serde_json::to_vec(&shows)?; + file.write_all(&v).await?; } "imdb_episodes" => { let episodes: Vec<_> = ImdbEpisodes::get_episodes_after_timestamp( @@ -269,7 +271,8 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&episodes)?).await?; + let v = serde_json::to_vec(&episodes)?; + file.write_all(&v).await?; } "plex_event" => { let events: Vec<_> = @@ -277,7 +280,8 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&events)?).await?; + let v = serde_json::to_vec(&events)?; + file.write_all(&v).await?; } "plex_filename" => { let filenames: Vec<_> = @@ -285,7 +289,8 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&filenames)?).await?; + let v = serde_json::to_vec(&filenames)?; + file.write_all(&v).await?; } "plex_metadata" => { let metadatas: Vec<_> = @@ -293,14 +298,16 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&metadatas)?).await?; + let v = serde_json::to_vec(&metadatas)?; + file.write_all(&v).await?; } "movie_collection" => { let mc = MovieCollection::new(&config, &pool, &stdout); let entries = mc .get_collection_after_timestamp(Some(start_timestamp), None, None) .await?; - file.write_all(&serde_json::to_vec(&entries)?).await?; + let v = serde_json::to_vec(&entries)?; + file.write_all(&v).await?; } "music_collection" => { let entries: Vec<_> = @@ -308,14 +315,16 @@ impl MovieQueueCli { .await? .try_collect() .await?; - file.write_all(&serde_json::to_vec(&entries)?).await?; + let v = serde_json::to_vec(&entries)?; + file.write_all(&v).await?; } "movie_queue" => { let mq = MovieQueueDB::new(&config, &pool, &stdout); let entries = mq .get_queue_after_timestamp(Some(start_timestamp), None, None) .await?; - file.write_all(&serde_json::to_vec(&entries)?).await?; + let v = serde_json::to_vec(&entries)?; + file.write_all(&v).await?; } _ => {} } diff --git a/transcode_lib/Cargo.toml b/transcode_lib/Cargo.toml index 33fd49b..91f67b2 100644 --- a/transcode_lib/Cargo.toml +++ b/transcode_lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "transcode_lib" -version = "0.10.52" +version = "0.11.0" authors = ["Daniel Boline "] edition = "2018" @@ -18,6 +18,6 @@ serde = {version="1.0", features=["derive"]} serde_json = "1.0" [dev-dependencies] -tokio = {version="1.40", features=["full"]} +tokio = {version="1.41", features=["full"]} stdout-channel = "0.6" movie_collection_lib = {path = "../movie_collection_lib"}