diff --git a/migrations/mysql/20240312130530_torrust_add_update_data_to_tracker_stats.sql b/migrations/mysql/20240312130530_torrust_add_update_data_to_tracker_stats.sql new file mode 100644 index 00000000..76b306de --- /dev/null +++ b/migrations/mysql/20240312130530_torrust_add_update_data_to_tracker_stats.sql @@ -0,0 +1,4 @@ +-- New field to track when stats were updated from the tracker +ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at DATETIME DEFAULT NULL; +UPDATE torrust_torrent_tracker_stats SET updated_at = '1000-01-01 00:00:00'; +ALTER TABLE torrust_torrent_tracker_stats MODIFY COLUMN updated_at DATETIME NOT NULL; \ No newline at end of file diff --git a/migrations/sqlite3/20240312130530_torrust_add_update_data_to_tracker_stats.sql b/migrations/sqlite3/20240312130530_torrust_add_update_data_to_tracker_stats.sql new file mode 100644 index 00000000..b376d945 --- /dev/null +++ b/migrations/sqlite3/20240312130530_torrust_add_update_data_to_tracker_stats.sql @@ -0,0 +1,2 @@ +-- New field to track when stats were updated from the tracker +ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at TEXT DEFAULT "1000-01-01 00:00:00"; diff --git a/src/console/cronjobs/tracker_statistics_importer.rs b/src/console/cronjobs/tracker_statistics_importer.rs index 0d32ba34..970fd7ca 100644 --- a/src/console/cronjobs/tracker_statistics_importer.rs +++ b/src/console/cronjobs/tracker_statistics_importer.rs @@ -17,12 +17,14 @@ use axum::extract::State; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::{DateTime, Utc}; -use log::{error, info}; +use log::{debug, error, info}; use serde_json::{json, Value}; +use text_colorizer::Colorize; use tokio::net::TcpListener; use tokio::task::JoinHandle; use crate::tracker::statistics_importer::StatisticsImporter; +use crate::utils::clock::seconds_ago_utc; const IMPORTER_API_IP: &str = "127.0.0.1"; @@ -41,7 +43,7 @@ struct ImporterState { #[must_use] pub fn start( importer_port: u16, - torrent_info_update_interval: u64, + torrent_stats_update_interval: u64, tracker_statistics_importer: &Arc, ) -> JoinHandle<()> { let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer); @@ -54,7 +56,7 @@ pub fn start( let _importer_api_handle = tokio::spawn(async move { let import_state = Arc::new(ImporterState { last_heartbeat: Arc::new(Mutex::new(Utc::now())), - torrent_info_update_interval, + torrent_info_update_interval: torrent_stats_update_interval, }); let app = Router::new() @@ -81,25 +83,56 @@ pub fn start( info!("Tracker statistics importer cronjob starting ..."); - let interval = std::time::Duration::from_secs(torrent_info_update_interval); - let mut interval = tokio::time::interval(interval); + // code-review: we set an execution interval to avoid intense polling to + // the database. If we remove the interval we would be constantly + // queering if there are torrent stats pending to update, unless there + // are torrents to update. Maybe we should only sleep for 100 milliseconds + // if we did not update any torrents in the latest execution. + // With this current limit we can only import 50 torrent stats every 100 + // milliseconds which is 500 torrents per second (1800000 torrents per hour). + // If the tracker can handle a request in 100 milliseconds. - interval.tick().await; // first tick is immediate... + let execution_interval_in_milliseconds = 100; + let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds); + let mut execution_interval = tokio::time::interval(execution_interval_duration); - loop { - interval.tick().await; + execution_interval.tick().await; // first tick is immediate... - info!("Running tracker statistics importer ..."); + info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ..."); + loop { if let Err(e) = send_heartbeat(importer_port).await { error!("Failed to send heartbeat from importer cronjob: {}", e); } - if let Some(tracker) = weak_tracker_statistics_importer.upgrade() { - drop(tracker.import_all_torrents_statistics().await); + if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() { + let one_interval_ago = seconds_ago_utc( + torrent_stats_update_interval + .try_into() + .expect("update interval should be a positive integer"), + ); + let limit = 50; + + debug!( + "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", + one_interval_ago.to_string().yellow(), + limit.to_string().yellow() + ); + + match statistics_importer + .import_torrents_statistics_not_updated_since(one_interval_ago, limit) + .await + { + Ok(()) => {} + Err(e) => error!("Failed to import statistics: {:?}", e), + } + + drop(statistics_importer); } else { break; } + + execution_interval.tick().await; } }) } diff --git a/src/databases/database.rs b/src/databases/database.rs index 2d56e22f..a19970be 100644 --- a/src/databases/database.rs +++ b/src/databases/database.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::databases::mysql::Mysql; @@ -292,6 +292,13 @@ pub trait Database: Sync + Send { /// Get all torrents as `Vec`. async fn get_all_torrents_compact(&self) -> Result, Error>; + /// Get torrents whose stats have not been imported from the tracker at least since a given datetime. + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, Error>; + /// Update a torrent's title with `torrent_id` and `title`. async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>; diff --git a/src/databases/mysql.rs b/src/databases/mysql.rs index b964a1e8..d28f1ae7 100644 --- a/src/databases/mysql.rs +++ b/src/databases/mysql.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Mysql { @@ -884,6 +884,27 @@ impl Database for Mysql { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?") .bind(title) @@ -1055,11 +1076,12 @@ impl Database for Mysql { seeders: i64, leechers: i64, ) -> Result<(), database::Error> { - query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES (?, ?, ?, ?)") + query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES (?, ?, ?, ?, ?)") .bind(torrent_id) .bind(tracker_url) .bind(seeders) .bind(leechers) + .bind(datetime_now()) .execute(&self.pool) .await .map(|_| ()) diff --git a/src/databases/sqlite.rs b/src/databases/sqlite.rs index 18b799f3..421292d6 100644 --- a/src/databases/sqlite.rs +++ b/src/databases/sqlite.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Sqlite { @@ -876,6 +876,27 @@ impl Database for Sqlite { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2") .bind(title) @@ -1047,11 +1068,12 @@ impl Database for Sqlite { seeders: i64, leechers: i64, ) -> Result<(), database::Error> { - query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES ($1, $2, $3, $4)") + query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES ($1, $2, $3, $4, $5)") .bind(torrent_id) .bind(tracker_url) .bind(seeders) .bind(leechers) + .bind(datetime_now()) .execute(&self.pool) .await .map(|_| ()) diff --git a/src/tracker/api.rs b/src/tracker/api.rs index d3c00188..c81a745e 100644 --- a/src/tracker/api.rs +++ b/src/tracker/api.rs @@ -15,6 +15,8 @@ impl ConnectionInfo { } } +const TOKEN_PARAM_NAME: &str = "token"; + pub struct Client { pub connection_info: ConnectionInfo, api_base_url: String, @@ -29,7 +31,7 @@ impl Client { pub fn new(connection_info: ConnectionInfo) -> Result { let base_url = format!("{}/api/v1", connection_info.url); let client = reqwest::Client::builder().timeout(Duration::from_secs(5)).build()?; - let token_param = [("token".to_string(), connection_info.token.to_string())]; + let token_param = [(TOKEN_PARAM_NAME.to_string(), connection_info.token.to_string())]; Ok(Self { connection_info, @@ -72,7 +74,7 @@ impl Client { self.client.post(request_url).query(&self.token_param).send().await } - /// Retrieve the info for a torrent. + /// Retrieve the info for one torrent. /// /// # Errors /// @@ -82,4 +84,23 @@ impl Client { self.client.get(request_url).query(&self.token_param).send().await } + + /// Retrieve the info for multiple torrents at the same time. + /// + /// # Errors + /// + /// Will return an error if the HTTP request fails. + pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result { + let request_url = format!("{}/torrents", self.api_base_url); + + let mut query_params: Vec<(String, String)> = Vec::with_capacity(info_hashes.len() + 1); + + query_params.push((TOKEN_PARAM_NAME.to_string(), self.connection_info.token.clone())); + + for info_hash in info_hashes { + query_params.push(("info_hash".to_string(), info_hash.clone())); + } + + self.client.get(request_url).query(&query_params).send().await + } } diff --git a/src/tracker/service.rs b/src/tracker/service.rs index 598e35fd..3036ce89 100644 --- a/src/tracker/service.rs +++ b/src/tracker/service.rs @@ -48,6 +48,14 @@ pub struct TorrentInfo { pub peers: Vec, } +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct TorrentBasicInfo { + pub info_hash: String, + pub seeders: i64, + pub completed: i64, + pub leechers: i64, +} + #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct Peer { pub peer_id: Option, @@ -259,6 +267,54 @@ impl Service { } } + /// Get torrent info from tracker in batches. + /// + /// # Errors + /// + /// Will return an error if the HTTP request to get torrent info fails or + /// if the response cannot be parsed. + pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result, TrackerAPIError> { + debug!(target: "tracker-service", "get torrents info"); + + let maybe_response = self.api_client.get_torrents_info(info_hashes).await; + + debug!(target: "tracker-service", "get torrents info response result: {:?}", maybe_response); + + match maybe_response { + Ok(response) => { + let status: StatusCode = map_status_code(response.status()); + + let body = response.text().await.map_err(|_| { + error!(target: "tracker-service", "response without body"); + TrackerAPIError::MissingResponseBody + })?; + + match status { + StatusCode::OK => serde_json::from_str(&body).map_err(|e| { + error!( + target: "tracker-service", "Failed to parse torrents info from tracker response. Body: {}, Error: {}", + body, e + ); + TrackerAPIError::FailedToParseTrackerResponse { body } + }), + StatusCode::INTERNAL_SERVER_ERROR => { + if body == Self::invalid_token_body() { + Err(TrackerAPIError::InvalidToken) + } else { + error!(target: "tracker-service", "get torrents info 500 response: status {status}, body: {body}"); + Err(TrackerAPIError::InternalServerError) + } + } + _ => { + error!(target: "tracker-service", "get torrents info unhandled response: status {status}, body: {body}"); + Err(TrackerAPIError::UnexpectedResponseStatus) + } + } + } + Err(_) => Err(TrackerAPIError::TrackerOffline), + } + } + /// Issue a new tracker key from tracker. async fn retrieve_new_tracker_key(&self, user_id: i64) -> Result { debug!(target: "tracker-service", "retrieve key: {user_id}"); diff --git a/src/tracker/statistics_importer.rs b/src/tracker/statistics_importer.rs index 996008f3..b9842855 100644 --- a/src/tracker/statistics_importer.rs +++ b/src/tracker/statistics_importer.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use std::time::Instant; -use log::{error, info}; +use chrono::{DateTime, Utc}; +use log::{debug, error, info}; use text_colorizer::Colorize; use super::service::{Service, TorrentInfo, TrackerAPIError}; @@ -36,13 +37,17 @@ impl StatisticsImporter { pub async fn import_all_torrents_statistics(&self) -> Result<(), database::Error> { let torrents = self.database.get_all_torrents_compact().await?; + if torrents.is_empty() { + return Ok(()); + } + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); // Start the timer before the loop let start_time = Instant::now(); for torrent in torrents { - info!(target: LOG_TARGET, "Importing torrent #{} ...", torrent.torrent_id.to_string().yellow()); + info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; @@ -53,6 +58,7 @@ impl StatisticsImporter { torrent.torrent_id, torrent.info_hash, err ); error!(target: "statistics_importer", "{}", message); + // todo: return a service error that can be a tracker API error or a database error. } } } @@ -64,6 +70,74 @@ impl StatisticsImporter { Ok(()) } + /// Import torrents statistics not updated recently.. + /// + /// # Errors + /// + /// Will return an error if the database query failed. + pub async fn import_torrents_statistics_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result<(), database::Error> { + debug!(target: LOG_TARGET, "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", datetime.to_string().yellow(), limit.to_string().yellow()); + + let torrents = self + .database + .get_torrents_with_stats_not_updated_since(datetime, limit) + .await?; + + if torrents.is_empty() { + return Ok(()); + } + + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); + + // Import stats for all torrents in one request + + let info_hashes: Vec = torrents.iter().map(|t| t.info_hash.clone()).collect(); + + let torrent_info_vec = match self.tracker_service.get_torrents_info(&info_hashes).await { + Ok(torrents_info) => torrents_info, + Err(err) => { + let message = format!("Error getting torrents tracker stats. Error: {err:?}"); + error!(target: LOG_TARGET, "{}", message); + // todo: return a service error that can be a tracker API error or a database error. + return Ok(()); + } + }; + + // Update stats for all torrents + + for torrent in torrents { + match torrent_info_vec.iter().find(|t| t.info_hash == torrent.info_hash) { + None => { + // No stats for this torrent in the tracker + drop( + self.database + .update_tracker_info(torrent.torrent_id, &self.tracker_url, 0, 0) + .await, + ); + } + Some(torrent_info) => { + // Update torrent stats for this tracker + drop( + self.database + .update_tracker_info( + torrent.torrent_id, + &self.tracker_url, + torrent_info.seeders, + torrent_info.leechers, + ) + .await, + ); + } + } + } + + Ok(()) + } + /// Import torrent statistics from tracker and update them in database. /// /// # Errors diff --git a/src/utils/clock.rs b/src/utils/clock.rs index b17ee48b..42269eeb 100644 --- a/src/utils/clock.rs +++ b/src/utils/clock.rs @@ -1,3 +1,7 @@ +use chrono::{DateTime, Duration, Utc}; + +pub const DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; + /// Returns the current timestamp in seconds. /// /// # Panics @@ -8,3 +12,17 @@ pub fn now() -> u64 { u64::try_from(chrono::prelude::Utc::now().timestamp()).expect("timestamp should be positive") } + +/// Returns the datetime some seconds ago. +#[must_use] +pub fn seconds_ago_utc(seconds: i64) -> DateTime { + Utc::now() - Duration::seconds(seconds) +} + +/// Returns the current time in database format. +/// +/// For example: `2024-03-12 15:56:24`. +#[must_use] +pub fn datetime_now() -> String { + Utc::now().format(DATETIME_FORMAT).to_string() +}