Skip to content

Commit

Permalink
fix: fallback for
Browse files Browse the repository at this point in the history
  • Loading branch information
lostb1t committed Jul 26, 2024
1 parent 7be6138 commit 51dcbc5
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 41 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ reqwest-retry = "0.6"
reqwest-middleware = "0.3"
memory-stats = "1.2.0"
graphql_client = { version = "0.14", features = ["reqwest"] }
percent-encoding = "2.3.1"
#format_serde_error = "0.3"

[dev-dependencies]
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ docker-run:
run:
REPLEX_DISABLE_CONTINUE_WATCHING=0 \
REPLEX_AUTO_SELECT_VERSION=0 \
REPLEX_VIDEO_TRANSCODE_FALLBACK_FOR="4k" \
REPLEX_FORCE_MAXIMUM_QUALITY=0 \
REPLEX_CACHE_ROWS=0 \
REPLEX_CACHE_ROWS_REFRESH=0 \
Expand All @@ -32,7 +33,8 @@ run:
REPLEX_ENABLE_CONSOLE=0 \
REPLEX_CACHE_TTL=0 \
REPLEX_HUB_RESTRICTIONS=1 \
RUST_LOG="info,replex=info" \
RUST_BACKTRACE=0 \
RUST_LOG="info,replex=debug" \
REPLEX_NTF_WATCHLIST_FORCE=0 \
RUSTFLAGS=-Awarnings \
cargo watch -w src -x run
Expand Down
2 changes: 1 addition & 1 deletion src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Handler for Logger {
let status = res.status_code.unwrap_or(StatusCode::OK);
tracing::debug!(
status = %status,
path = %req.uri(),
path = %req.uri().path(),
duration = ?duration,
"Response"
);
Expand Down
66 changes: 45 additions & 21 deletions src/plex_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct PlexClient {
pub context: PlexContext,
pub host: String, // TODO: Dont think this supposed to be here. Should be higher up
pub cache: Cache<String, MediaContainerWrapper<MediaContainer>>,
pub default_headers: header::HeaderMap,
}

impl PlexClient {
Expand All @@ -81,20 +82,25 @@ impl PlexClient {

pub async fn request(
&self,
req: &mut Request,
req: &Request,
) -> Result<reqwest::Response, Error> {
let url = format!(
"{}{}",
self.host,
&req.uri_mut().path_and_query().unwrap()
&req.uri().clone().path_and_query().unwrap()
);

let mut headers = req.headers_mut().clone();
headers.remove(ACCEPT); // remove accept as we always do json request

let mut headers = self.default_headers.clone();
for (key, value) in req.headers().iter() {
if key != ACCEPT {
headers.insert(key, value.clone());
}
}
//let mut headers = req.headers_mut().clone();
//headers.remove(ACCEPT); // remove accept as we always do json request
//dbg!(&headers);
let res = self
.http_client
.request(req.method_mut().to_owned(), url)
.request(req.method().clone(), url)
.headers(headers)
.send()
.await
Expand All @@ -103,16 +109,33 @@ impl PlexClient {
Ok(res)
}

// pub async fn proxy_request(
// &self,
// req: &mut Request,
// ) -> Result<reqwest::Response, Error> {
// self.request(req)
// }
pub async fn proxy_request(
&self,
req: &Request,
) -> Result<reqwest::Response, Error> {
let url = format!(
"{}{}?{}",
self.host,
encode_url_path(&url_path_getter(req).unwrap()),
url_query_getter(req).unwrap()
);
//dbg!(&req);
//dbg!(&url);
//dbg!(&req.uri().clone().query().unwrap().to_string());
let mut headers = req.headers().clone();
//headers.remove(ACCEPT); // remove accept as we always do json request

// pub fn request(&self, req) -> hyper::client::ResponseFuture {
// self.http_client.request(req)
// }
let res = self
.http_client
.request(req.method().clone(), url)
//.execute(req)
.headers(headers)
.send()
.await
.map_err(Error::other)?;
//dbg!(&res);
Ok(res)
}

pub async fn get_section_collections(
&self,
Expand Down Expand Up @@ -239,19 +262,19 @@ impl PlexClient {
&self,
id: i32,
) -> Result<MediaContainerWrapper<MediaContainer>> {
let resp = self.get("/hubs".to_string()).await.unwrap();
let res = self.get("/hubs".to_string()).await.unwrap();
let container: MediaContainerWrapper<MediaContainer> =
from_reqwest_response(resp).await.unwrap();
from_reqwest_response(res).await.unwrap();
Ok(container)
}

pub async fn get_item_by_key(
self,
key: String,
) -> Result<MediaContainerWrapper<MediaContainer>> {
let resp = self.get(key).await.unwrap();
let res = self.get(key).await.unwrap();
let container: MediaContainerWrapper<MediaContainer> =
from_reqwest_response(resp).await.unwrap();
from_reqwest_response(res).await.unwrap();
Ok(container)
}

Expand Down Expand Up @@ -446,13 +469,14 @@ impl PlexClient {
Self {
http_client: reqwest_middleware::ClientBuilder::new(
reqwest::Client::builder()
.default_headers(headers)
//.default_headers(headers)
.gzip(true)
.timeout(Duration::from_secs(30))
.build()
.unwrap(),
)
.build(),
default_headers: headers,
host: config.host.unwrap(),
context: context.clone(),
//x_plex_token: token,
Expand Down
16 changes: 13 additions & 3 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ pub async fn default_transform(
match upstream_res.status() {
reqwest::StatusCode::OK => (),
status => {
tracing::error!(status = ?status, res = ?upstream_res, "Failed to get plex response");
tracing::error!(status = ?status, res = ?upstream_res, req = ?req, "Failed to get plex response");
return Err(
salvo::http::StatusError::internal_server_error().into()
);
Expand Down Expand Up @@ -987,9 +987,19 @@ async fn get_transcoding_for_request(
) -> Result<TranscodingStatus, anyhow::Error> {
let context: PlexContext = req.extract().await.unwrap();
let plex_client = PlexClient::from_context(&context);
let response = plex_client.request(req).await?;

let mut res = &mut Response::new();
let mut depot = &mut Depot::new();
let mut ctrl = &mut FlowCtrl::new(vec![]);
proxy_for_transform.handle(req, depot, res, ctrl).await;
//dbg!(&res_upstream);

//let res = plex_client.proxy_request(&req).await?;
//dbg!(&res);
//dbg!(&req);

let transcode: MediaContainerWrapper<MediaContainer> =
from_reqwest_response(response).await?;
from_salvo_response(res).await?;
let mut is_transcoding = false;

if transcode.media_container.size.is_some()
Expand Down
46 changes: 33 additions & 13 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use strum_macros::Display as EnumDisplay;
use strum_macros::EnumString;
// use http_body::{Limited, Full};
use http_body_util::BodyExt;
use percent_encoding::{utf8_percent_encode, CONTROLS};
use url::Url;
use tokio::time::Duration;
use yaserde::ser::to_string as to_xml_str;
Expand All @@ -30,23 +31,30 @@ use salvo::{

use crate::models::*;

pub fn default_url_path_getter(
pub fn url_path_getter(
req: &Request,
_depot: &Depot,
) -> Option<String> {
//dbg!(&req);
Some(req.uri().path().to_string())
}

pub fn default_url_query_getter(
pub fn url_query_getter(
req: &Request,
) -> Option<String> {
req.uri().query().map(Into::into)
}

pub fn salvo_url_path_getter(
req: &Request,
_depot: &Depot,
) -> Option<String> {
//dbg!(&req.uri().query());
match req.uri().query() {
Some(i) => Some(i.to_string()),
_ => None
}
url_path_getter(req)
}

pub fn salvo_url_query_getter(
req: &Request,
_depot: &Depot,
) -> Option<String> {
url_query_getter(req)
}

// Proxy to plex instance
Expand All @@ -59,8 +67,8 @@ pub fn default_proxy() -> Proxy<String, ReqwestClient> {
.build()
.unwrap())
);
proxy = proxy.url_path_getter(default_url_path_getter);
proxy = proxy.url_query_getter(default_url_query_getter);
proxy = proxy.url_path_getter(salvo_url_path_getter);
proxy = proxy.url_query_getter(salvo_url_query_getter);
proxy
}

Expand All @@ -71,12 +79,14 @@ pub fn proxy(upstream: String) -> Proxy<String, ReqwestClient> {
.build()
.unwrap())
);
proxy = proxy.url_path_getter(default_url_path_getter);
proxy = proxy.url_query_getter(default_url_query_getter);
proxy = proxy.url_path_getter(salvo_url_path_getter);
proxy = proxy.url_query_getter(salvo_url_query_getter);

proxy
}

//pub fn proxy_request()

pub fn test_proxy(upstream: String) -> Proxy<String, ReqwestClient> {
let mut proxy = Proxy::new(
upstream,
Expand All @@ -88,6 +98,15 @@ pub fn test_proxy(upstream: String) -> Proxy<String, ReqwestClient> {
proxy
}

/// Encode url path. This can be used when build your custom url path getter.
#[inline]
pub fn encode_url_path(path: &str) -> String {
path.split('/')
.map(|s| utf8_percent_encode(s, CONTROLS).to_string())
.collect::<Vec<_>>()
.join("/")
}

pub fn get_collection_id_from_child_path(path: String) -> i32 {
let mut path = path.replace("/library/collections/", "");
path = path.replace("/children", "");
Expand Down Expand Up @@ -224,6 +243,7 @@ pub async fn from_reqwest_response(
res: reqwest::Response,
) -> Result<MediaContainerWrapper<MediaContainer>, Error> {
let bytes = res.bytes().await.unwrap();
//dbg!(&bytes);
from_bytes(bytes)
}

Expand Down

0 comments on commit 51dcbc5

Please sign in to comment.