From 36f27d2a43d79e269260b326afae03d609816f13 Mon Sep 17 00:00:00 2001 From: Thomas Gardner Date: Wed, 26 Aug 2020 15:27:24 +1000 Subject: [PATCH] Add a `torrent announce` subcommand This command makes use of a partially-implemented UDP tracker client to announce an infohash and list the response. type: added --- Cargo.toml | 7 +- bin/gen/config.yaml | 4 + src/common.rs | 6 +- src/error.rs | 19 ++ src/host_port.rs | 44 +++- src/infohash.rs | 6 + src/lib.rs | 1 + src/subcommand/torrent.rs | 3 + src/subcommand/torrent/announce.rs | 291 ++++++++++++++++++++++++++ src/tracker.rs | 12 ++ src/tracker/action.rs | 18 ++ src/tracker/announce.rs | 276 +++++++++++++++++++++++++ src/tracker/client.rs | 320 +++++++++++++++++++++++++++++ src/tracker/connect.rs | 187 +++++++++++++++++ src/tracker/request.rs | 9 + src/tracker/response.rs | 11 + 16 files changed, 1205 insertions(+), 9 deletions(-) create mode 100644 src/subcommand/torrent/announce.rs create mode 100644 src/tracker.rs create mode 100644 src/tracker/action.rs create mode 100644 src/tracker/announce.rs create mode 100644 src/tracker/client.rs create mode 100644 src/tracker/connect.rs create mode 100644 src/tracker/request.rs create mode 100644 src/tracker/response.rs diff --git a/Cargo.toml b/Cargo.toml index ee445216..b5b734f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ default-run = "imdl" [features] default = [] -bench = ["rand"] +bench = [] [dependencies] ansi_term = "0.12.0" @@ -29,6 +29,7 @@ lexiclean = "0.0.1" libc = "0.2.0" log = "0.4.8" md5 = "0.7.0" +rand = "0.7.3" open = "1.4.0" pretty_assertions = "0.6.0" pretty_env_logger = "0.4.0" @@ -65,10 +66,6 @@ features = ["default", "wrap_help"] version = "2.1.1" features = ["serde"] -[dependencies.rand] -version = "0.7.3" -optional = true - [dev-dependencies] criterion = "0.3.0" temptree = "0.0.0" diff --git a/bin/gen/config.yaml b/bin/gen/config.yaml index 9df907a6..2e8ca0d0 100644 --- a/bin/gen/config.yaml +++ b/bin/gen/config.yaml @@ -11,6 +11,10 @@ examples: text: "BitTorrent metainfo related functionality is under the `torrent` subcommand:" code: "imdl torrent --help" +- command: imdl torrent announce + text: "Announce a torrent to trackers and print returned peers:" + code: "imdl torrent announce --input foo.torrent" + - command: imdl torrent create text: "Intermodal can be used to create `.torrent` files:" code: "imdl torrent create --input foo" diff --git a/src/common.rs b/src/common.rs index 6726f7e1..79a84bb3 100644 --- a/src/common.rs +++ b/src/common.rs @@ -12,6 +12,7 @@ pub(crate) use std::{ hash::Hash, io::{self, BufRead, BufReader, Cursor, Read, Write}, iter::{self, Sum}, + net::{IpAddr, SocketAddr, ToSocketAddrs, UdpSocket}, num::{ParseFloatError, ParseIntError, TryFromIntError}, ops::{AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}, path::{self, Path, PathBuf}, @@ -19,7 +20,7 @@ pub(crate) use std::{ str::{self, FromStr}, string::FromUtf8Error, sync::Once, - time::{SystemTime, SystemTimeError}, + time::{Duration, SystemTime, SystemTimeError}, usize, }; @@ -31,6 +32,7 @@ pub(crate) use ignore::WalkBuilder; pub(crate) use indicatif::{ProgressBar, ProgressStyle}; pub(crate) use lexiclean::Lexiclean; pub(crate) use libc::EXIT_FAILURE; +pub(crate) use rand::Rng; pub(crate) use regex::{Regex, RegexSet}; pub(crate) use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer}; pub(crate) use serde_hex::SerHex; @@ -52,7 +54,7 @@ pub(crate) use url::{Host, Url}; pub(crate) use log::trace; // modules -pub(crate) use crate::{consts, error, host_port_parse_error, magnet_link_parse_error}; +pub(crate) use crate::{consts, error, host_port_parse_error, magnet_link_parse_error, tracker}; // functions pub(crate) use crate::xor_args::xor_args; diff --git a/src/error.rs b/src/error.rs index 9993e7e8..73ce60a0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -32,6 +32,8 @@ pub(crate) enum Error { FileSearch { source: ignore::Error }, #[snafu(display("Invalid glob: {}", source))] GlobParse { source: globset::Error }, + #[snafu(display("Failed to parse host and port: {}", source))] + HostPortParse { source: HostPortParseError }, #[snafu(display("Failed to serialize torrent info dictionary: {}", source))] InfoSerialize { source: bendy::serde::Error }, #[snafu(display("Input target empty"))] @@ -54,6 +56,8 @@ pub(crate) enum Error { source: bendy::serde::Error, input: InputTarget, }, + #[snafu(display("A peer source is required"))] + MetainfoMissingTrackers, #[snafu(display("Failed to serialize torrent metainfo: {}", source))] MetainfoSerialize { source: bendy::serde::Error }, #[snafu(display("Failed to decode metainfo bencode from {}: {}", input, error))] @@ -66,6 +70,8 @@ pub(crate) enum Error { input: InputTarget, source: MetainfoError, }, + #[snafu(display("Network I/O error: {}", source))] + Network { source: io::Error }, #[snafu(display("Failed to invoke opener: {}", source))] OpenerInvoke { source: io::Error }, #[snafu(display("Opener failed: {}", exit_status))] @@ -136,6 +142,19 @@ pub(crate) enum Error { SymlinkRoot { root: PathBuf }, #[snafu(display("Failed to retrieve system time: {}", source))] SystemTime { source: SystemTimeError }, + #[snafu(display("Failed to resolve UDP tracker `{}`: {}.", host_port, source))] + TrackerDnsResolution { + host_port: HostPort, + source: io::Error, + }, + #[snafu(display("UDP tracker resolved `{}` to no useable addresses.", host_port))] + TrackerNoHosts { host_port: HostPort }, + #[snafu(display("Connection to UDP tracker `{}` timed out.", host_port))] + TrackerTimeout { host_port: HostPort }, + #[snafu(display("Malformed response from UDP tracker."))] + TrackerResponse, + #[snafu(display("Response from UDP tracker wrong length: got {}; want {}.", got, want))] + TrackerResponseLength { want: usize, got: usize }, #[snafu(display( "Feature `{}` cannot be used without passing the `--unstable` flag", feature diff --git a/src/host_port.rs b/src/host_port.rs index 6e2f55fc..f16c289a 100644 --- a/src/host_port.rs +++ b/src/host_port.rs @@ -2,8 +2,34 @@ use crate::common::*; #[derive(Debug, PartialEq, Clone)] pub(crate) struct HostPort { - host: Host, - port: u16, + pub host: Host, + pub port: u16, +} + +impl HostPort { + pub(crate) fn from_url(url: &Url) -> Option { + match (url.host(), url.port()) { + (Some(host), Some(port)) => Some(HostPort { + host: host.to_owned(), + port, + }), + _ => None, + } + } +} + +impl ToSocketAddrs for HostPort { + type Iter = std::vec::IntoIter; + + fn to_socket_addrs(&self) -> io::Result { + let address = match &self.host { + Host::Domain(domain) => return (domain.clone(), self.port).to_socket_addrs(), + Host::Ipv4(address) => IpAddr::V4(*address), + Host::Ipv6(address) => IpAddr::V6(*address), + }; + + Ok(vec![SocketAddr::new(address, self.port)].into_iter()) + } } impl FromStr for HostPort { @@ -156,4 +182,18 @@ mod tests { "l39:1234:5678:9abc:def0:1234:5678:9abc:def0i65000ee", ); } + + #[test] + fn test_from_url() { + let url = Url::parse("udp://imdl.io:12345").unwrap(); + let host_port = HostPort::from_url(&url).unwrap(); + assert_eq!(host_port.host, Host::Domain("imdl.io".into())); + assert_eq!(host_port.port, 12345); + } + + #[test] + fn test_from_url_no_port() { + let url = Url::parse("udp://imdl.io").unwrap(); + assert!(HostPort::from_url(&url).is_none()); + } } diff --git a/src/infohash.rs b/src/infohash.rs index c2d5c733..9425787b 100644 --- a/src/infohash.rs +++ b/src/infohash.rs @@ -62,6 +62,12 @@ impl From for Infohash { } } +impl From for [u8; 20] { + fn from(infohash: Infohash) -> Self { + infohash.inner.bytes() + } +} + impl From for Sha1Digest { fn from(infohash: Infohash) -> Sha1Digest { infohash.inner diff --git a/src/lib.rs b/src/lib.rs index d50d3757..a812a269 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,7 @@ mod style; mod subcommand; mod table; mod torrent_summary; +mod tracker; mod use_color; mod verifier; mod walker; diff --git a/src/subcommand/torrent.rs b/src/subcommand/torrent.rs index fa895a18..49775409 100644 --- a/src/subcommand/torrent.rs +++ b/src/subcommand/torrent.rs @@ -1,5 +1,6 @@ use crate::common::*; +mod announce; mod create; mod link; mod piece_length; @@ -14,6 +15,7 @@ mod verify; about("Subcommands related to the BitTorrent protocol.") )] pub(crate) enum Torrent { + Announce(announce::Announce), Create(create::Create), Link(link::Link), #[structopt(alias = "piece-size")] @@ -26,6 +28,7 @@ pub(crate) enum Torrent { impl Torrent { pub(crate) fn run(self, env: &mut Env, options: &Options) -> Result<(), Error> { match self { + Self::Announce(announce) => announce.run(env), Self::Create(create) => create.run(env, options), Self::Link(link) => link.run(env), Self::PieceLength(piece_length) => piece_length.run(env), diff --git a/src/subcommand/torrent/announce.rs b/src/subcommand/torrent/announce.rs new file mode 100644 index 00000000..ef18de34 --- /dev/null +++ b/src/subcommand/torrent/announce.rs @@ -0,0 +1,291 @@ +use crate::common::*; + +const INPUT_HELP: &str = + "Read torrent metainfo from `INPUT`. If `INPUT` is `-`, read metainfo from standard input."; + +const INPUT_FLAG: &str = "input-flag"; + +const INPUT_POSITIONAL: &str = ""; + +#[derive(StructOpt)] +#[structopt( + help_message(consts::HELP_MESSAGE), + version_message(consts::VERSION_MESSAGE), + about("Announce a .torrent file.") +)] +pub(crate) struct Announce { + #[structopt( + name = INPUT_FLAG, + long = "input", + short = "i", + value_name = "INPUT", + empty_values(false), + parse(try_from_os_str = InputTarget::try_from_os_str), + help = INPUT_HELP, + )] + input_flag: Option, + #[structopt( + name = INPUT_POSITIONAL, + value_name = "INPUT", + empty_values(false), + parse(try_from_os_str = InputTarget::try_from_os_str), + required_unless = INPUT_FLAG, + conflicts_with = INPUT_FLAG, + help = INPUT_HELP, + )] + input_positional: Option, +} + +impl Announce { + pub(crate) fn run(self, env: &mut Env) -> Result<(), Error> { + let target = xor_args( + "input_flag", + &self.input_flag, + "input_positional", + &self.input_positional, + )?; + let input = env.read(target)?; + + let mut peer_list = Vec::new(); + let mut trackers = Vec::new(); + let infohash = Infohash::from_input(&input)?; + let metainfo = Metainfo::from_input(&input)?; + + for result in metainfo.trackers() { + match result { + Ok(tracker) => trackers.push(tracker), + Err(err) => errln!(env, "Skipping malformed tracker URL: {}", err)?, + } + } + + if trackers.is_empty() { + return Err(Error::MetainfoMissingTrackers); + } + + for tracker in trackers { + if tracker.scheme() != "udp" { + errln!( + env, + "Only UDP trackers are supported; skipping {}.", + tracker + )?; + continue; + } + + let hostport = if let Some(hostport) = HostPort::from_url(&tracker) { + hostport + } else { + errln!(env, "Tracker URL `{}` is not well formed.", tracker)?; + continue; + }; + + let client = match tracker::Client::connect(&hostport) { + Err(err) => { + errln!(env, "Couldn't connect to tracker: {}", err)?; + continue; + } + Ok(client) => client, + }; + + match client.announce(infohash) { + Ok(subswarm) => { + errln!(env, "Successful announce to tracker `{}`.", tracker)?; + peer_list.extend(subswarm); + } + Err(err) => errln!(env, "Announce failed with tracker `{}`: {}", tracker, err)?, + } + } + + for peer in &peer_list { + outln!(env, "{}", peer)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(test)] + pub(crate) fn new_dummy_metainfo() -> Metainfo { + Metainfo { + announce: None, + announce_list: None, + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info: Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }, + } + } + + #[test] + fn input_required() { + test_env! { + args: [ + "torrent", + "announce", + ], + tree: { + }, + matches: Err(Error::Clap { .. }), + }; + } + + #[test] + fn input_arguments_positional() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "foo", + ], + tree: {}, + }; + assert_matches!(env.run(), Err(error::Error::Filesystem { .. })); + } + + #[test] + fn input_arguments_flag() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "foo", + ], + tree: {}, + }; + assert_matches!(env.run(), Err(error::Error::Filesystem { .. })); + } + + #[test] + fn input_arguments_conflict() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "foo", + "bar", + ], + tree: {}, + }; + assert_matches!(env.run(), Err(Error::Clap { .. })); + } + + #[test] + fn metainfo_missing_trackers() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "test.torrent", + ], + tree: {}, + }; + let metainfo = new_dummy_metainfo(); + env.write("test.torrent", metainfo.serialize().unwrap()); + assert_matches!(env.run(), Err(Error::MetainfoMissingTrackers)); + } + + #[test] + fn metainfo_non_udp_tracker() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "test.torrent", + ], + tree: {}, + }; + let https_tracker_url = "https://intermodal.io/tracker/announce"; + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![https_tracker_url.into()]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info: Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }, + }; + env.write("test.torrent", metainfo.serialize().unwrap()); + env.run().unwrap(); + assert_eq!( + env.err(), + format!( + "Only UDP trackers are supported; skipping {}.\n", + https_tracker_url + ) + ); + } + + #[test] + fn tracker_host_port_not_well_formed() { + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "test.torrent", + ], + tree: {}, + }; + let tracker_url = "udp://1.2.3.4/announce:1333337"; + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![tracker_url.into()]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info: Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }, + }; + env.write("test.torrent", metainfo.serialize().unwrap()); + env.run().unwrap(); + assert_eq!( + env.err(), + format!("Tracker URL `{}` is not well formed.\n", tracker_url) + ); + } +} diff --git a/src/tracker.rs b/src/tracker.rs new file mode 100644 index 00000000..dd0b4861 --- /dev/null +++ b/src/tracker.rs @@ -0,0 +1,12 @@ +use request::Request; +use response::Response; + +pub(crate) use client::Client; + +mod action; +mod client; +mod request; +mod response; + +mod announce; +mod connect; diff --git a/src/tracker/action.rs b/src/tracker/action.rs new file mode 100644 index 00000000..5fef29a2 --- /dev/null +++ b/src/tracker/action.rs @@ -0,0 +1,18 @@ +#[derive(Debug)] +pub enum Action { + Connect, + Announce, + Scrape, + Error, +} + +impl From for u32 { + fn from(a: Action) -> Self { + match a { + Action::Connect => 0, + Action::Announce => 1, + Action::Scrape => 2, + Action::Error => 3, + } + } +} diff --git a/src/tracker/announce.rs b/src/tracker/announce.rs new file mode 100644 index 00000000..0a289f19 --- /dev/null +++ b/src/tracker/announce.rs @@ -0,0 +1,276 @@ +use crate::common::*; + +#[derive(Debug, PartialEq)] +pub(crate) struct Request { + pub(crate) connection_id: u64, // 8 bytes + pub(crate) action: u32, // 12 + pub(crate) transaction_id: u32, // 16 + pub(crate) infohash: [u8; 20], // 36 + pub(crate) peer_id: [u8; 20], // 56 + pub(crate) downloaded: u64, // 64 + pub(crate) left: u64, // 72 + pub(crate) uploaded: u64, // 80 + pub(crate) event: u64, // 88 + pub(crate) ip_address: u32, // 92 + pub(crate) num_want: u32, // 96 + pub(crate) port: u16, // 98 +} + +impl Request { + pub(crate) const LENGTH: usize = 98; +} + +#[derive(Debug, PartialEq)] +pub(crate) struct Response { + pub(crate) action: u32, // 4 bytes + pub(crate) transaction_id: u32, // 8 + pub(crate) interval: u32, // 12 + pub(crate) leechers: u32, // 16 + pub(crate) seeders: u32, // 20 +} + +impl Response { + pub(crate) const LENGTH: usize = 20; +} + +impl super::Request for Request { + type Response = Response; + + fn serialize(&self) -> Vec { + let mut msg = Vec::new(); + + msg.extend_from_slice(&self.connection_id.to_be_bytes()); + msg.extend_from_slice(&self.action.to_be_bytes()); + msg.extend_from_slice(&self.transaction_id.to_be_bytes()); + msg.extend_from_slice(&self.infohash); + msg.extend_from_slice(&self.peer_id); + msg.extend_from_slice(&self.downloaded.to_be_bytes()); + msg.extend_from_slice(&self.left.to_be_bytes()); + msg.extend_from_slice(&self.uploaded.to_be_bytes()); + msg.extend_from_slice(&self.event.to_be_bytes()); + msg.extend_from_slice(&self.ip_address.to_be_bytes()); + msg.extend_from_slice(&self.num_want.to_be_bytes()); + msg.extend_from_slice(&self.port.to_be_bytes()); + + msg + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Response for Request { + fn deserialize(buf: &[u8]) -> Result<(Self, &[u8])> { + if buf.len() != Request::LENGTH { + return Err(Error::TrackerResponseLength { + got: buf.len(), + want: Request::LENGTH, + }); + } + + Ok(( + Request { + connection_id: u64::from_be_bytes( + buf[0..8] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + action: u32::from_be_bytes( + buf[8..12] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + transaction_id: u32::from_be_bytes( + buf[12..16] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + infohash: buf[16..36] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + peer_id: buf[36..56] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + downloaded: u64::from_be_bytes( + buf[56..64] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + left: u64::from_be_bytes( + buf[64..72] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + uploaded: u64::from_be_bytes( + buf[72..80] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + event: u64::from_be_bytes( + buf[80..88] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + ip_address: u32::from_be_bytes( + buf[88..92] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + num_want: u32::from_be_bytes( + buf[92..96] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + port: u16::from_be_bytes( + buf[96..98] + .try_into() + .invariant_unwrap("bounds guaranteed OK by type system"), + ), + }, + &buf[Self::LENGTH..], + )) + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Response for Response { + fn deserialize(buf: &[u8]) -> Result<(Self, &[u8])> { + if buf.len() < Response::LENGTH { + return Err(Error::TrackerResponseLength { + want: Response::LENGTH, + got: buf.len(), + }); + } + + Ok(( + Response { + action: u32::from_be_bytes( + buf[0..4] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + transaction_id: u32::from_be_bytes( + buf[4..8] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + interval: u32::from_be_bytes( + buf[8..12] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + leechers: u32::from_be_bytes( + buf[12..16] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + seeders: u32::from_be_bytes( + buf[16..20] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + }, + &buf[Self::LENGTH..], + )) + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Request for Response { + type Response = Request; + + #[allow(dead_code)] + fn serialize(&self) -> Vec { + let mut msg = Vec::new(); + + msg.extend_from_slice(&self.action.to_be_bytes()); + msg.extend_from_slice(&self.transaction_id.to_be_bytes()); + msg.extend_from_slice(&self.interval.to_be_bytes()); + msg.extend_from_slice(&self.leechers.to_be_bytes()); + msg.extend_from_slice(&self.seeders.to_be_bytes()); + + msg + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tracker::{announce, request::Request, response::Response}; + + #[test] + pub(crate) fn announce_request_roundtrip() { + let req = announce::Request { + connection_id: 0x01, + action: 0x02, + transaction_id: 0x03, + infohash: [0x04; 20], + peer_id: [0x05; 20], + downloaded: 0x06, + left: 0x07, + uploaded: 0x08, + event: 0x09, + ip_address: 0x0a, + num_want: 0x0b, + port: 0x0c, + }; + let buf = req.serialize(); + let (req2, _) = announce::Request::deserialize(&buf).unwrap(); + assert_eq!(req, req2); + } + + #[test] + pub(crate) fn announce_response_roundtrip() { + let resp = announce::Response { + action: 0x01, + transaction_id: 0x02, + interval: 0x03, + leechers: 0x04, + seeders: 0x05, + }; + let buf = resp.serialize(); + let (resp2, _) = announce::Response::deserialize(&buf).unwrap(); + assert_eq!(resp, resp2); + } + + #[test] + pub(crate) fn announce_request_bad_deserialize() { + let buf = [0x01, 0x02, 0x03, 0x04, 0x05]; + let err = announce::Request::deserialize(&buf); + assert_matches!(err, Err(Error::TrackerResponseLength { .. })); + } + + #[test] + pub(crate) fn announce_response_bad_deserialize() { + let buf = [0x01, 0x02, 0x03, 0x04, 0x05]; + let err = announce::Response::deserialize(&buf); + assert_matches!(err, Err(Error::TrackerResponseLength { .. })); + } +} diff --git a/src/tracker/client.rs b/src/tracker/client.rs new file mode 100644 index 00000000..6c4a629c --- /dev/null +++ b/src/tracker/client.rs @@ -0,0 +1,320 @@ +use super::*; +use crate::common::*; + +pub(crate) struct Client { + connection_id: Option, + peer_id: [u8; 20], + is_ipv6: bool, + sock: UdpSocket, + host_port: HostPort, +} + +impl<'a> Client { + const UDP_TRACKER_MAGIC: u64 = 0x0000_0417_2710_1980; + + pub fn connect(host_port: &HostPort) -> Result { + let mut rng = rand::thread_rng(); + + let is_ipv6; + let sock; + let mut addrs = host_port + .to_socket_addrs() + // TODO: Unit test this. + .context(error::TrackerDnsResolution { + host_port: host_port.clone(), + })?; + if let Some(addr) = addrs.next() { + sock = match addr { + SocketAddr::V4(_) => { + is_ipv6 = false; + UdpSocket::bind("0.0.0.0:0").context(error::Network)? + } + SocketAddr::V6(_) => { + is_ipv6 = true; + UdpSocket::bind("[::]:0").context(error::Network)? + } + }; + sock.connect(addr).context(error::Network)?; + sock + // TODO: Implement some better backoff logic. + .set_read_timeout(Some(Duration::new(3, 0))) + .context(error::Network)?; + } else { + // TODO: Unit test this. + return Err(Error::TrackerNoHosts { + host_port: host_port.clone(), + }); + } + + let mut client = Client { + peer_id: rng.gen(), + connection_id: None, + sock, + is_ipv6, + host_port: host_port.clone(), + }; + + let req = connect::Request { + protocol_id: Self::UDP_TRACKER_MAGIC, + action: 0, + transaction_id: rng.gen(), + }; + let mut buf = [0u8; announce::Response::LENGTH]; + let (resp, _) = client.exchange(&req, &mut buf)?; + client.connection_id.replace(resp.connection_id); + + Ok(client) + } + + pub fn announce(&self, btinh: Infohash) -> Result> { + let mut rng = rand::thread_rng(); + let connection_id; + if let Some(id) = self.connection_id { + connection_id = id; + } else { + return Err(Error::TrackerResponse); + } + let req = announce::Request { + connection_id, + action: 0x0001, + transaction_id: rng.gen(), + infohash: btinh.into(), + peer_id: self.peer_id, + downloaded: 0x0000, + left: u64::MAX, + uploaded: 0x0000, + event: 0x0000, + ip_address: 0x0000, + num_want: u32::MAX, + port: self.sock.local_addr().context(error::Network)?.port(), + }; + let mut buf = [0u8; 1024]; + let (_, payload) = self.exchange(&req, &mut buf)?; + + Client::parse_compact_peer_list(self.is_ipv6, payload) + } + + fn exchange(&self, req: &T, rxbuf: &'a mut [u8]) -> Result<(T::Response, &'a [u8])> { + let msg = req.serialize(); + let mut len_read: usize = 0; + + for _ in 0..3 { + self.sock.send(&msg).context(error::Network)?; + if let Ok(len) = self.sock.recv(rxbuf) { + len_read = len; + break; + } + } + + if len_read == 0 { + return Err(Error::TrackerTimeout { + host_port: self.host_port.clone(), + }); + } + + let (resp, payload) = T::Response::deserialize(&rxbuf[..len_read])?; + if resp.transaction_id() != req.transaction_id() || resp.action() != req.action() { + return Err(Error::TrackerResponse); + } + + Ok((resp, payload)) + } + + fn parse_compact_peer_list(is_ipv6: bool, buf: &[u8]) -> Result> { + let mut subswarm = Vec::::new(); + let stride = if is_ipv6 { 18 } else { 6 }; + + let chunks = buf.chunks_exact(stride); + if !chunks.remainder().is_empty() { + return Err(Error::TrackerResponse); + } + + for hostpost in chunks { + let (ip, port) = hostpost.split_at(stride - 2); + let ip = if is_ipv6 { + let buf: [u8; 16] = ip[0..16] + .try_into() + .invariant_unwrap("iterator guarantees bounds are OK"); + IpAddr::from(std::net::Ipv6Addr::from(buf)) + } else { + IpAddr::from(std::net::Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3])) + }; + + let port = u16::from_be_bytes( + port + .try_into() + .invariant_unwrap("iterator guarantees bounds are OK"), + ); + + subswarm.push((ip, port).into()); + } + + Ok(subswarm) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use action::Action; + use std::{thread, time::Duration}; + + #[cfg(test)] + pub(crate) fn new_dummy_metainfo() -> Metainfo { + Metainfo { + announce: None, + announce_list: None, + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info: Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }, + } + } + + fn simulate_tracker_announce_response_helper(server: UdpSocket, targets: Vec, slowly: bool) { + let mut rng = rand::thread_rng(); + let mut buf = [0u8; 8192]; + + // connect exchange + { + let (n, peer) = server.recv_from(&mut buf).unwrap(); + let (req, _) = connect::Request::deserialize(buf[..n].try_into().unwrap()).unwrap(); + let req = connect::Response { + action: Action::Connect.into(), + transaction_id: req.transaction_id, + connection_id: rng.gen(), + } + .serialize(); + if slowly { + thread::sleep(Duration::new(10, 0)); + } + server.send_to(&req, peer).unwrap(); + } + + // announce exchange + { + let (n, peer) = server.recv_from(&mut buf).unwrap(); + let (req, _) = announce::Request::deserialize(&buf[..n]).unwrap(); + let mut req: Vec = announce::Response { + action: Action::Announce.into(), + transaction_id: req.transaction_id, + interval: 0x1337_1337, + leechers: 0xcafe_babe, + seeders: 0xdead_beef, + } + .serialize(); + req.extend_from_slice(&targets); + if slowly { + thread::sleep(Duration::new(10, 0)); + } + server.send_to(&req, peer).unwrap(); + } + } + + fn simulate_tracker_announce_response(server: UdpSocket, targets: Vec) { + simulate_tracker_announce_response_helper(server, targets, false); + } + + fn simulate_tracker_announce_response_slowly(server: UdpSocket, targets: Vec) { + simulate_tracker_announce_response_helper(server, targets, true); + } + + fn client_end_to_end_announce_helper(addr: &str, slowly: bool) { + let server = UdpSocket::bind(addr).unwrap(); + server.set_read_timeout(Some(Duration::new(15, 0))).unwrap(); + + let mut metainfo = new_dummy_metainfo(); + let server_local_addr = server.local_addr().unwrap(); + metainfo.announce = Some(format!("udp://{}", server_local_addr)); + let mut env = test_env! { + args: [ + "torrent", + "announce", + "--input", + "test.torrent", + ], + tree: { + } + }; + env.write("test.torrent", metainfo.serialize().unwrap()); + + let targets = if server_local_addr.is_ipv6() { + vec![ + 0x13, 0x37, 0x00, 0x00, // ip + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xca, + 0xfe, // port + ] + } else { + vec![0x13, 0x37, 0xca, 0xfe, 0xba, 0xbe] + }; + + let target_sockaddrs = Client::parse_compact_peer_list(server_local_addr.is_ipv6(), &targets) + .unwrap() + .into_iter() + .map(|i| i.to_string()) + .collect::>() + .join("\n"); + + // simulate a tracker response + if slowly { + thread::spawn(move || { + simulate_tracker_announce_response_slowly(server, targets); + }); + } else { + thread::spawn(move || { + simulate_tracker_announce_response(server, targets); + }); + } + env.run().unwrap(); + + if slowly { + assert_eq!( + env.err(), + format!( + "Couldn't connect to tracker: Connection to UDP tracker `{}` timed out.\n", + server_local_addr + ) + ); + } else { + assert_eq!(env.out(), format!("{}\n", target_sockaddrs)); + } + } + + #[test] + fn client_end_to_end_announce_hostname() { + client_end_to_end_announce_helper("localhost:0", false); + } + + #[test] + fn client_end_to_end_announce_ipv4() { + client_end_to_end_announce_helper("127.0.0.1:0", false); + } + + #[test] + fn client_end_to_end_announce_ipv6() { + client_end_to_end_announce_helper("[::1]:0", false); + } + + #[test] + fn client_end_to_end_announce_ipv6_timeout() { + client_end_to_end_announce_helper("[::1]:0", true); + } + + #[test] + fn client_end_to_end_announce_timeout() { + client_end_to_end_announce_helper("127.0.0.1:0", true); + } +} diff --git a/src/tracker/connect.rs b/src/tracker/connect.rs new file mode 100644 index 00000000..e3e10e1b --- /dev/null +++ b/src/tracker/connect.rs @@ -0,0 +1,187 @@ +use crate::common::*; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub(crate) struct Request { + pub(crate) protocol_id: u64, + pub(crate) action: u32, + pub(crate) transaction_id: u32, +} + +impl Request { + pub(crate) const LENGTH: usize = 16; +} + +#[derive(Debug, PartialEq)] +pub(crate) struct Response { + pub(crate) action: u32, + pub(crate) transaction_id: u32, + pub(crate) connection_id: u64, +} + +impl Response { + pub(crate) const LENGTH: usize = 16; +} + +impl super::Request for Request { + type Response = Response; + + fn serialize(&self) -> Vec { + let mut msg = Vec::new(); + + msg.extend_from_slice(&self.protocol_id.to_be_bytes()); + msg.extend_from_slice(&self.action.to_be_bytes()); + msg.extend_from_slice(&self.transaction_id.to_be_bytes()); + + msg + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Response for Request { + fn deserialize(buf: &[u8]) -> Result<(Self, &[u8])> { + if buf.len() != Self::LENGTH { + return Err(Error::TrackerResponse); + } + + Ok(( + Request { + protocol_id: u64::from_be_bytes( + buf[0..8] + .try_into() + .invariant_unwrap("incoming type guarantees bounds are OK"), + ), + action: u32::from_be_bytes( + buf[8..12] + .try_into() + .invariant_unwrap("incoming type guarantees bounds are OK"), + ), + transaction_id: u32::from_be_bytes( + buf[12..16] + .try_into() + .invariant_unwrap("incoming type guarantees bounds are OK"), + ), + }, + &buf[Self::LENGTH..], + )) + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Request for Response { + type Response = Request; + + fn serialize(&self) -> Vec { + let mut msg = Vec::new(); + + msg.extend_from_slice(&self.action.to_be_bytes()); + msg.extend_from_slice(&self.transaction_id.to_be_bytes()); + msg.extend_from_slice(&self.connection_id.to_be_bytes()); + + msg + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +impl super::Response for Response { + fn deserialize(buf: &[u8]) -> Result<(Self, &[u8])> { + if buf.len() < Self::LENGTH { + return Err(Error::TrackerResponse); + } + + Ok(( + Self { + action: u32::from_be_bytes( + buf[0..4] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + transaction_id: u32::from_be_bytes( + buf[4..8] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + connection_id: u64::from_be_bytes( + buf[8..16] + .try_into() + .invariant_unwrap("bounds are checked manually above"), + ), + }, + &buf[Self::LENGTH..], + )) + } + + fn transaction_id(&self) -> u32 { + self.transaction_id + } + + fn action(&self) -> u32 { + self.action + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tracker::{connect, request::Request, response::Response}; + + #[test] + pub(crate) fn connect_request_roundtrip() { + let req = connect::Request { + protocol_id: 0x1337_beef_babe_cafe, + action: 50, + transaction_id: 1234, + }; + + let buf = req.serialize(); + let (req2, _) = connect::Request::deserialize(&buf).unwrap(); + assert_eq!(req, req2); + } + + #[test] + pub(crate) fn connect_response_roundtrip() { + let resp = connect::Response { + action: 50, + transaction_id: 1234, + connection_id: 0x1337_beef_babe_cafe, + }; + + let buf = resp.serialize(); + let (resp2, _) = connect::Response::deserialize(&buf).unwrap(); + assert_eq!(resp, resp2); + } + + #[test] + pub(crate) fn connect_request_datagram_size() { + let buf = [0x01, 0x02, 0x03]; + let err = connect::Request::deserialize(&buf); + assert_matches!(err, Err(Error::TrackerResponse)); + } + + #[test] + pub(crate) fn connect_response_datagram_size() { + let buf = [0x01, 0x02, 0x03]; + let err = connect::Response::deserialize(&buf); + assert_matches!(err, Err(Error::TrackerResponse)); + } +} diff --git a/src/tracker/request.rs b/src/tracker/request.rs new file mode 100644 index 00000000..615889ba --- /dev/null +++ b/src/tracker/request.rs @@ -0,0 +1,9 @@ +use super::Response; + +pub(crate) trait Request { + type Response: Response; + fn serialize(&self) -> Vec; + + fn transaction_id(&self) -> u32; + fn action(&self) -> u32; +} diff --git a/src/tracker/response.rs b/src/tracker/response.rs new file mode 100644 index 00000000..366ddcf3 --- /dev/null +++ b/src/tracker/response.rs @@ -0,0 +1,11 @@ +use crate::common::*; + +pub(crate) trait Response { + // Deserialize the response into a Response object and payload. + fn deserialize(buf: &[u8]) -> Result<(Self, &[u8])> + where + Self: std::marker::Sized; + + fn transaction_id(&self) -> u32; + fn action(&self) -> u32; +}