Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- New field to track when stats were updated from the tracker
ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at DATETIME DEFAULT NULL;
UPDATE torrust_torrent_tracker_stats SET updated_at = '1000-01-01 00:00:00';
ALTER TABLE torrust_torrent_tracker_stats MODIFY COLUMN updated_at DATETIME NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- New field to track when stats were updated from the tracker
ALTER TABLE torrust_torrent_tracker_stats ADD COLUMN updated_at TEXT DEFAULT "1000-01-01 00:00:00";
55 changes: 44 additions & 11 deletions src/console/cronjobs/tracker_statistics_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use axum::extract::State;
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::{DateTime, Utc};
use log::{error, info};
use log::{debug, error, info};
use serde_json::{json, Value};
use text_colorizer::Colorize;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

use crate::tracker::statistics_importer::StatisticsImporter;
use crate::utils::clock::seconds_ago_utc;

const IMPORTER_API_IP: &str = "127.0.0.1";

Expand All @@ -41,7 +43,7 @@ struct ImporterState {
#[must_use]
pub fn start(
importer_port: u16,
torrent_info_update_interval: u64,
torrent_stats_update_interval: u64,
tracker_statistics_importer: &Arc<StatisticsImporter>,
) -> JoinHandle<()> {
let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer);
Expand All @@ -54,7 +56,7 @@ pub fn start(
let _importer_api_handle = tokio::spawn(async move {
let import_state = Arc::new(ImporterState {
last_heartbeat: Arc::new(Mutex::new(Utc::now())),
torrent_info_update_interval,
torrent_info_update_interval: torrent_stats_update_interval,
});

let app = Router::new()
Expand All @@ -81,25 +83,56 @@ pub fn start(

info!("Tracker statistics importer cronjob starting ...");

let interval = std::time::Duration::from_secs(torrent_info_update_interval);
let mut interval = tokio::time::interval(interval);
// code-review: we set an execution interval to avoid intense polling to
// the database. If we remove the interval we would be constantly
// queering if there are torrent stats pending to update, unless there
// are torrents to update. Maybe we should only sleep for 100 milliseconds
// if we did not update any torrents in the latest execution.
// With this current limit we can only import 50 torrent stats every 100
// milliseconds which is 500 torrents per second (1800000 torrents per hour).
// If the tracker can handle a request in 100 milliseconds.

interval.tick().await; // first tick is immediate...
let execution_interval_in_milliseconds = 100;
let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds);
let mut execution_interval = tokio::time::interval(execution_interval_duration);

loop {
interval.tick().await;
execution_interval.tick().await; // first tick is immediate...

info!("Running tracker statistics importer ...");
info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ...");

loop {
if let Err(e) = send_heartbeat(importer_port).await {
error!("Failed to send heartbeat from importer cronjob: {}", e);
}

if let Some(tracker) = weak_tracker_statistics_importer.upgrade() {
drop(tracker.import_all_torrents_statistics().await);
if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() {
let one_interval_ago = seconds_ago_utc(
torrent_stats_update_interval
.try_into()
.expect("update interval should be a positive integer"),
);
let limit = 50;

debug!(
"Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...",
one_interval_ago.to_string().yellow(),
limit.to_string().yellow()
);

match statistics_importer
.import_torrents_statistics_not_updated_since(one_interval_ago, limit)
.await
{
Ok(()) => {}
Err(e) => error!("Failed to import statistics: {:?}", e),
}

drop(statistics_importer);
} else {
break;
}

execution_interval.tick().await;
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion src/databases/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::databases::mysql::Mysql;
Expand Down Expand Up @@ -292,6 +292,13 @@ pub trait Database: Sync + Send {
/// Get all torrents as `Vec<TorrentCompact>`.
async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, Error>;

/// Get torrents whose stats have not been imported from the tracker at least since a given datetime.
async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, Error>;

/// Update a torrent's title with `torrent_id` and `title`.
async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>;

Expand Down
28 changes: 25 additions & 3 deletions src/databases/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock;
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Mysql {
Expand Down Expand Up @@ -884,6 +884,27 @@ impl Database for Mysql {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
.bind(title)
Expand Down Expand Up @@ -1055,11 +1076,12 @@ impl Database for Mysql {
seeders: i64,
leechers: i64,
) -> Result<(), database::Error> {
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES (?, ?, ?, ?)")
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES (?, ?, ?, ?, ?)")
.bind(torrent_id)
.bind(tracker_url)
.bind(seeders)
.bind(leechers)
.bind(datetime_now())
.execute(&self.pool)
.await
.map(|_| ())
Expand Down
28 changes: 25 additions & 3 deletions src/databases/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
use chrono::NaiveDateTime;
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};

Expand All @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag};
use crate::models::tracker_key::TrackerKey;
use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
use crate::utils::clock;
use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
use crate::utils::hex::from_bytes;

pub struct Sqlite {
Expand Down Expand Up @@ -876,6 +876,27 @@ impl Database for Sqlite {
.map_err(|_| database::Error::Error)
}

async fn get_torrents_with_stats_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<Vec<TorrentCompact>, database::Error> {
query_as::<_, TorrentCompact>(
"SELECT tt.torrent_id, tt.info_hash
FROM torrust_torrents tt
LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
WHERE tts.updated_at < ? OR tts.updated_at IS NULL
ORDER BY tts.updated_at ASC
LIMIT ?
",
)
.bind(datetime.format(DATETIME_FORMAT).to_string())
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|_| database::Error::Error)
}

async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
.bind(title)
Expand Down Expand Up @@ -1047,11 +1068,12 @@ impl Database for Sqlite {
seeders: i64,
leechers: i64,
) -> Result<(), database::Error> {
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES ($1, $2, $3, $4)")
query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES ($1, $2, $3, $4, $5)")
.bind(torrent_id)
.bind(tracker_url)
.bind(seeders)
.bind(leechers)
.bind(datetime_now())
.execute(&self.pool)
.await
.map(|_| ())
Expand Down
25 changes: 23 additions & 2 deletions src/tracker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl ConnectionInfo {
}
}

const TOKEN_PARAM_NAME: &str = "token";

pub struct Client {
pub connection_info: ConnectionInfo,
api_base_url: String,
Expand All @@ -29,7 +31,7 @@ impl Client {
pub fn new(connection_info: ConnectionInfo) -> Result<Self, Error> {
let base_url = format!("{}/api/v1", connection_info.url);
let client = reqwest::Client::builder().timeout(Duration::from_secs(5)).build()?;
let token_param = [("token".to_string(), connection_info.token.to_string())];
let token_param = [(TOKEN_PARAM_NAME.to_string(), connection_info.token.to_string())];

Ok(Self {
connection_info,
Expand Down Expand Up @@ -72,7 +74,7 @@ impl Client {
self.client.post(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for a torrent.
/// Retrieve the info for one torrent.
///
/// # Errors
///
Expand All @@ -82,4 +84,23 @@ impl Client {

self.client.get(request_url).query(&self.token_param).send().await
}

/// Retrieve the info for multiple torrents at the same time.
///
/// # Errors
///
/// Will return an error if the HTTP request fails.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Response, Error> {
let request_url = format!("{}/torrents", self.api_base_url);

let mut query_params: Vec<(String, String)> = Vec::with_capacity(info_hashes.len() + 1);

query_params.push((TOKEN_PARAM_NAME.to_string(), self.connection_info.token.clone()));

for info_hash in info_hashes {
query_params.push(("info_hash".to_string(), info_hash.clone()));
}

self.client.get(request_url).query(&query_params).send().await
}
}
56 changes: 56 additions & 0 deletions src/tracker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ pub struct TorrentInfo {
pub peers: Vec<Peer>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct TorrentBasicInfo {
pub info_hash: String,
pub seeders: i64,
pub completed: i64,
pub leechers: i64,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct Peer {
pub peer_id: Option<PeerId>,
Expand Down Expand Up @@ -259,6 +267,54 @@ impl Service {
}
}

/// Get torrent info from tracker in batches.
///
/// # Errors
///
/// Will return an error if the HTTP request to get torrent info fails or
/// if the response cannot be parsed.
pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result<Vec<TorrentBasicInfo>, TrackerAPIError> {
debug!(target: "tracker-service", "get torrents info");

let maybe_response = self.api_client.get_torrents_info(info_hashes).await;

debug!(target: "tracker-service", "get torrents info response result: {:?}", maybe_response);

match maybe_response {
Ok(response) => {
let status: StatusCode = map_status_code(response.status());

let body = response.text().await.map_err(|_| {
error!(target: "tracker-service", "response without body");
TrackerAPIError::MissingResponseBody
})?;

match status {
StatusCode::OK => serde_json::from_str(&body).map_err(|e| {
error!(
target: "tracker-service", "Failed to parse torrents info from tracker response. Body: {}, Error: {}",
body, e
);
TrackerAPIError::FailedToParseTrackerResponse { body }
}),
StatusCode::INTERNAL_SERVER_ERROR => {
if body == Self::invalid_token_body() {
Err(TrackerAPIError::InvalidToken)
} else {
error!(target: "tracker-service", "get torrents info 500 response: status {status}, body: {body}");
Err(TrackerAPIError::InternalServerError)
}
}
_ => {
error!(target: "tracker-service", "get torrents info unhandled response: status {status}, body: {body}");
Err(TrackerAPIError::UnexpectedResponseStatus)
}
}
}
Err(_) => Err(TrackerAPIError::TrackerOffline),
}
}

/// Issue a new tracker key from tracker.
async fn retrieve_new_tracker_key(&self, user_id: i64) -> Result<TrackerKey, TrackerAPIError> {
debug!(target: "tracker-service", "retrieve key: {user_id}");
Expand Down
Loading