Skip to content

Commit

Permalink
refactor: decouple tracker::StatisticsImporter from tracker::Service
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed May 9, 2023
1 parent 0198361 commit 63aefcf
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 97 deletions.
13 changes: 8 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::databases::database::connect_database;
use crate::mailer::MailerService;
use crate::routes;
use crate::tracker::service::Service;
use crate::tracker::statistics_importer::StatisticsImporter;

pub struct Running {
pub api_server: Server,
Expand Down Expand Up @@ -45,6 +46,7 @@ pub async fn run(configuration: Configuration) -> Running {
let database = Arc::new(connect_database(&database_connect_url).await.expect("Database error."));
let auth = Arc::new(AuthorizationService::new(cfg.clone(), database.clone()));
let tracker_service = Arc::new(Service::new(cfg.clone(), database.clone()).await);
let tracker_statistics_importer = Arc::new(StatisticsImporter::new(cfg.clone(), database.clone()).await);
let mailer_service = Arc::new(MailerService::new(cfg.clone()).await);
let image_cache_service = Arc::new(ImageCacheService::new(cfg.clone()).await);

Expand All @@ -55,23 +57,24 @@ pub async fn run(configuration: Configuration) -> Running {
database.clone(),
auth.clone(),
tracker_service.clone(),
tracker_statistics_importer.clone(),
mailer_service,
image_cache_service,
));

// Start repeating task to import tracker torrent data and updating
// seeders and leechers info.

let weak_tracker_service = Arc::downgrade(&tracker_service);
let weak_tracker_statistics_importer = Arc::downgrade(&tracker_statistics_importer);

let tracker_data_importer_handle = tokio::spawn(async move {
let tracker_statistics_importer_handle = tokio::spawn(async move {
let interval = std::time::Duration::from_secs(database_torrent_info_update_interval);
let mut interval = tokio::time::interval(interval);
interval.tick().await; // first tick is immediate...
loop {
interval.tick().await;
if let Some(tracker) = weak_tracker_service.upgrade() {
let _ = tracker.update_torrents().await;
if let Some(tracker) = weak_tracker_statistics_importer.upgrade() {
let _ = tracker.import_all_torrents_statistics().await;
} else {
break;
}
Expand Down Expand Up @@ -105,6 +108,6 @@ pub async fn run(configuration: Configuration) -> Running {
Running {
api_server: running_server,
socket_address,
tracker_data_importer_handle,
tracker_data_importer_handle: tracker_statistics_importer_handle,
}
}
10 changes: 7 additions & 3 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::config::Configuration;
use crate::databases::database::Database;
use crate::mailer::MailerService;
use crate::tracker::service::Service;
use crate::tracker::statistics_importer::StatisticsImporter;

pub type Username = String;

Expand All @@ -15,7 +16,8 @@ pub struct AppData {
pub cfg: Arc<Configuration>,
pub database: Arc<Box<dyn Database>>,
pub auth: Arc<AuthorizationService>,
pub tracker: Arc<Service>,
pub tracker_service: Arc<Service>,
pub tracker_statistics_importer: Arc<StatisticsImporter>,
pub mailer: Arc<MailerService>,
pub image_cache_manager: Arc<ImageCacheService>,
}
Expand All @@ -25,15 +27,17 @@ impl AppData {
cfg: Arc<Configuration>,
database: Arc<Box<dyn Database>>,
auth: Arc<AuthorizationService>,
tracker: Arc<Service>,
tracker_service: Arc<Service>,
tracker_statistics_importer: Arc<StatisticsImporter>,
mailer: Arc<MailerService>,
image_cache_manager: Arc<ImageCacheService>,
) -> AppData {
AppData {
cfg,
database,
auth,
tracker,
tracker_service,
tracker_statistics_importer,
mailer,
image_cache_manager,
}
Expand Down
6 changes: 3 additions & 3 deletions src/console/commands/import_tracker_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use text_colorizer::*;
use crate::bootstrap::config::init_configuration;
use crate::bootstrap::logging;
use crate::databases::database::connect_database;
use crate::tracker::service::Service;
use crate::tracker::statistics_importer::StatisticsImporter;

const NUMBER_OF_ARGUMENTS: usize = 0;

Expand Down Expand Up @@ -76,7 +76,7 @@ pub async fn import(_args: &Arguments) {
.expect("Database error."),
);

let tracker_service = Arc::new(Service::new(cfg.clone(), database.clone()).await);
let tracker_statistics_importer = Arc::new(StatisticsImporter::new(cfg.clone(), database.clone()).await);

tracker_service.update_torrents().await.unwrap();
tracker_statistics_importer.import_all_torrents_statistics().await.unwrap();
}
16 changes: 8 additions & 8 deletions src/routes/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ pub async fn upload_torrent(req: HttpRequest, payload: Multipart, app_data: WebA

// update torrent tracker stats
let _ = app_data
.tracker
.update_torrent_tracker_stats(torrent_id, &torrent_request.torrent.info_hash())
.tracker_statistics_importer
.import_torrent_statistics(torrent_id, &torrent_request.torrent.info_hash())
.await;

// whitelist info hash on tracker
// code-review: why do we always try to whitelist the torrent on the tracker?
// shouldn't we only do this if the torrent is in "Listed" mode?
if let Err(e) = app_data
.tracker
.tracker_service
.whitelist_info_hash(torrent_request.torrent.info_hash())
.await
{
Expand Down Expand Up @@ -146,7 +146,7 @@ pub async fn download_torrent_handler(req: HttpRequest, app_data: WebAppData) ->
match user {
Ok(user) => {
let personal_announce_url = app_data
.tracker
.tracker_service
.get_personal_announce_url(user.user_id)
.await
.unwrap_or(tracker_url);
Expand Down Expand Up @@ -210,7 +210,7 @@ pub async fn get_torrent_handler(req: HttpRequest, app_data: WebAppData) -> Serv
Ok(user) => {
// if no user owned tracker key can be found, use default tracker url
let personal_announce_url = app_data
.tracker
.tracker_service
.get_personal_announce_url(user.user_id)
.await
.unwrap_or(tracker_url);
Expand Down Expand Up @@ -240,8 +240,8 @@ pub async fn get_torrent_handler(req: HttpRequest, app_data: WebAppData) -> Serv

// get realtime seeders and leechers
if let Ok(torrent_info) = app_data
.tracker
.get_torrent_info(torrent_response.torrent_id, &torrent_response.info_hash)
.tracker_statistics_importer
.import_torrent_statistics(torrent_response.torrent_id, &torrent_response.info_hash)
.await
{
torrent_response.seeders = torrent_info.seeders;
Expand Down Expand Up @@ -310,7 +310,7 @@ pub async fn delete_torrent_handler(req: HttpRequest, app_data: WebAppData) -> S

// remove info_hash from tracker whitelist
let _ = app_data
.tracker
.tracker_service
.remove_info_hash_from_whitelist(torrent_listing.info_hash)
.await;

Expand Down
1 change: 1 addition & 0 deletions src/tracker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod api;
pub mod service;
pub mod statistics_importer;
78 changes: 0 additions & 78 deletions src/tracker/service.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,11 @@
use std::sync::Arc;

use log::{error, info};
use serde::{Deserialize, Serialize};

use super::api::{Client, ConnectionInfo};
use crate::config::Configuration;
use crate::databases::database::Database;
use crate::errors::ServiceError;
use crate::models::tracker_key::TrackerKey;

#[derive(Debug, Serialize, Deserialize)]
pub struct TorrentInfo {
pub info_hash: String,
pub seeders: i64,
pub completed: i64,
pub leechers: i64,
pub peers: Vec<Peer>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Peer {
pub peer_id: Option<PeerId>,
pub peer_addr: Option<String>,
pub updated: Option<i64>,
pub uploaded: Option<i64>,
pub downloaded: Option<i64>,
pub left: Option<i64>,
pub event: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct PeerId {
pub id: Option<String>,
pub client: Option<String>,
}

pub struct Service {
database: Arc<Box<dyn Database>>,
api_client: Client,
Expand Down Expand Up @@ -153,53 +124,4 @@ impl Service {
// return tracker key
Ok(tracker_key)
}

/// Get torrent info from tracker API
///
/// # Errors
///
/// Will return an error if the HTTP request failed or the torrent is not
/// found.
pub async fn get_torrent_info(&self, torrent_id: i64, info_hash: &str) -> Result<TorrentInfo, ServiceError> {
let response = self
.api_client
.get_torrent_info(info_hash)
.await
.map_err(|_| ServiceError::InternalServerError)?;

if let Ok(torrent_info) = response.json::<TorrentInfo>().await {
let _ = self
.database
.update_tracker_info(torrent_id, &self.tracker_url, torrent_info.seeders, torrent_info.leechers)
.await;
Ok(torrent_info)
} else {
let _ = self.database.update_tracker_info(torrent_id, &self.tracker_url, 0, 0).await;
Err(ServiceError::TorrentNotFound)
}
}

pub async fn update_torrents(&self) -> Result<(), ServiceError> {
info!("Updating torrents ...");
let torrents = self.database.get_all_torrents_compact().await?;

for torrent in torrents {
info!("Updating torrent {} ...", torrent.torrent_id);
let ret = self
.update_torrent_tracker_stats(torrent.torrent_id, &torrent.info_hash)
.await;
if let Some(err) = ret.err() {
error!(
"Error updating torrent tracker stats for torrent {}: {:?}",
torrent.torrent_id, err
);
}
}

Ok(())
}

pub async fn update_torrent_tracker_stats(&self, torrent_id: i64, info_hash: &str) -> Result<TorrentInfo, ServiceError> {
self.get_torrent_info(torrent_id, info_hash).await
}
}
122 changes: 122 additions & 0 deletions src/tracker/statistics_importer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::sync::Arc;

use log::{error, info};
use serde::{Deserialize, Serialize};

use super::api::{Client, ConnectionInfo};
use crate::config::Configuration;
use crate::databases::database::{Database, DatabaseError};
use crate::errors::ServiceError;

// If `TorrentInfo` struct is used in the future for other purposes, it should
// be moved to a separate file. Maybe a `ClientWrapper` struct which returns
// `TorrentInfo` and `TrackerKey` structs instead of `Response` structs.

#[derive(Debug, Serialize, Deserialize)]
pub struct TorrentInfo {
pub info_hash: String,
pub seeders: i64,
pub completed: i64,
pub leechers: i64,
pub peers: Vec<Peer>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Peer {
pub peer_id: Option<PeerId>,
pub peer_addr: Option<String>,
pub updated: Option<i64>,
pub uploaded: Option<i64>,
pub downloaded: Option<i64>,
pub left: Option<i64>,
pub event: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct PeerId {
pub id: Option<String>,
pub client: Option<String>,
}

pub struct StatisticsImporter {
database: Arc<Box<dyn Database>>,
api_client: Client,
tracker_url: String,
}

impl StatisticsImporter {
pub async fn new(cfg: Arc<Configuration>, database: Arc<Box<dyn Database>>) -> Self {
let settings = cfg.settings.read().await;
let api_client = Client::new(ConnectionInfo::new(
settings.tracker.api_url.clone(),
settings.tracker.token.clone(),
));
let tracker_url = settings.tracker.url.clone();
drop(settings);
Self {
database,
api_client,
tracker_url,
}
}

/// Import torrents statistics from tracker and update them in database.
///
/// # Errors
///
/// Will return an error if the database query failed.
pub async fn import_all_torrents_statistics(&self) -> Result<(), DatabaseError> {
info!("Importing torrents statistics from tracker ...");
let torrents = self.database.get_all_torrents_compact().await?;

for torrent in torrents {
info!("Updating torrent {} ...", torrent.torrent_id);

let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await;

// code-review: should we treat differently for each case?. The
// tracker API could be temporarily offline, or there could be a
// tracker misconfiguration.
//
// This is the log when the torrent is not found in the tracker:
//
// ```
// 2023-05-09T13:31:24.497465723+00:00 [torrust_index_backend::tracker::statistics_importer][ERROR] Error updating torrent tracker stats for torrent with id 140: TorrentNotFound
// ```

if let Some(err) = ret.err() {
error!(
"Error updating torrent tracker stats for torrent with id {}: {:?}",
torrent.torrent_id, err
);
}
}

Ok(())
}

/// Import torrent statistics from tracker and update them in database.
///
/// # Errors
///
/// Will return an error if the HTTP request failed or the torrent is not
/// found.
pub async fn import_torrent_statistics(&self, torrent_id: i64, info_hash: &str) -> Result<TorrentInfo, ServiceError> {
let response = self
.api_client
.get_torrent_info(info_hash)
.await
.map_err(|_| ServiceError::InternalServerError)?;

if let Ok(torrent_info) = response.json::<TorrentInfo>().await {
let _ = self
.database
.update_tracker_info(torrent_id, &self.tracker_url, torrent_info.seeders, torrent_info.leechers)
.await;
Ok(torrent_info)
} else {
let _ = self.database.update_tracker_info(torrent_id, &self.tracker_url, 0, 0).await;
Err(ServiceError::TorrentNotFound)
}
}
}

0 comments on commit 63aefcf

Please sign in to comment.