Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Commit

Permalink
Update the rest of the dependencies and code to modern versions (async)
Browse files Browse the repository at this point in the history
Notably updated actix, tokio, and juniper.

Juniper now requires Send + Sync for context, prompting me to open
graphql-rust/juniper#840

The file streaming code has been replaced by the ones provided by actix,
as they support everything we need now.

Signed-off-by: Mcat12 <[email protected]>
  • Loading branch information
AzureMarker committed Jan 2, 2021
1 parent 7eb123a commit 23e7400
Show file tree
Hide file tree
Showing 31 changed files with 1,491 additions and 2,251 deletions.
2,125 changes: 1,004 additions & 1,121 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ diesel_migrations = "1.4"
diesel = { version = "1.4", features = ["sqlite", "chrono"] }
chrono = "0.4"
uuid = { version = "0.8", features = ["v4", "serde"] }
actix = "0.7"
actix-web = "0.7"
actix-files = "0.5"
actix-web = "3.3"
http-range = "0.1"
bytes = "0.4"
bytes = "1.0"
rand = "0.8"
lru-disk-cache = "0.4"
futures-cpupool = "0.1"
tokio-process = "0.2"
serde = "1.0"
serde_json = "1.0"
juniper = "0.9"
tokio = { version = "0.2", features = ["process"] }
juniper = "0.15"
juniper_actix = "0.2"
mime_guess = "2.0"
taglib2-sys = { path = "taglib2-sys" }
image = "0.23"
walkdir = "2.3"
indicatif = "0.15"
structopt = "0.3"
app_dirs = "1.2"
futures = "0.1"
futures = "0.3"
error-chain = "0.12"
rust-embed = { version = "5.7", optional = true }
5 changes: 1 addition & 4 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ extern crate error_chain;
#[macro_use]
extern crate diesel_migrations;

#[macro_use]
extern crate futures;

#[cfg(feature = "embed_web")]
#[macro_use]
extern crate rust_embed;
Expand Down Expand Up @@ -112,7 +109,7 @@ fn run() -> Result<()> {
let transcode_cache = make_transcode_cache(app_dir)?;
let temporary_files = TemporaryFiles::new("forte")?;

server::serve(pool, &host, transcode_cache, temporary_files);
server::serve(pool, &host, transcode_cache, temporary_files)?;
}
Command::Sync { directory } => {
let mut artwork_directory = app_dir;
Expand Down
72 changes: 0 additions & 72 deletions src/bin/server/files.rs

This file was deleted.

125 changes: 19 additions & 106 deletions src/bin/server/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,124 +1,37 @@
use crate::server::transcoder::Transcoder;
use actix::prelude::*;
use actix_web::error;
use actix_web::AsyncResponder;
use actix_web::FutureResponse;
use actix_web::HttpRequest;
use actix_web::HttpResponse;
use actix_web::Json;
use actix_web::State;
use actix_web::web::{Data, Payload};
use actix_web::{error, get, post, HttpRequest, HttpResponse};
use forte_core::context;
use forte_core::context::GraphQLContext;
use forte_core::models::Schema;
use futures::Future;
use juniper::graphiql::graphiql_source;
use juniper::http::GraphQLRequest;
use std::sync::Arc;

mod errors {
error_chain! {
foreign_links {
R2d2(::r2d2::Error);
SerdeJson(::serde_json::Error);
}
}
}
use juniper_actix::{graphiql_handler, graphql_handler};

pub struct AppState {
pub executor: Addr<GraphQLExecutor>,
pub schema: Schema,
pub connection_pool: context::Pool,
pub transcoder: Addr<Transcoder>,
pub transcoder: Transcoder,
}

impl AppState {
pub fn new(
executor: Addr<GraphQLExecutor>,
transcoder: Addr<Transcoder>,
connection_pool: context::Pool,
) -> AppState {
AppState {
executor,
transcoder,
connection_pool,
}
}

pub fn build_context(&self) -> Result<GraphQLContext, ::r2d2::Error> {
pub fn build_context(&self) -> Result<GraphQLContext, r2d2::Error> {
let connection = self.connection_pool.get()?;
Ok(GraphQLContext::new(connection))
}
}

struct ResolveMessage {
request: GraphQLRequest,
context: GraphQLContext,
}

impl Message for ResolveMessage {
type Result = Result<(bool, String), errors::Error>;
}

pub struct GraphQLExecutor {
schema: Arc<Schema>,
}

impl GraphQLExecutor {
pub fn new(schema: Arc<Schema>) -> GraphQLExecutor {
GraphQLExecutor { schema }
}
}

impl Actor for GraphQLExecutor {
type Context = SyncContext<Self>;
}

impl Handler<ResolveMessage> for GraphQLExecutor {
type Result = Result<(bool, String), errors::Error>;

fn handle(&mut self, request: ResolveMessage, _ctx: &mut Self::Context) -> Self::Result {
let response = request.request.execute(&self.schema, &request.context);
let text = serde_json::to_string(&response)?;

Ok((response.is_ok(), text))
}
}

pub fn graphql(params: (State<AppState>, Json<GraphQLRequest>)) -> FutureResponse<HttpResponse> {
let state = params.0;
let request = params.1;

let context_future = futures::done(
state
.build_context()
.map_err(error::ErrorInternalServerError),
);

context_future
.and_then(move |context| {
state
.executor
.send(ResolveMessage {
request: request.0,
context,
})
.from_err()
})
.and_then(|resp| match resp {
Ok((true, body)) => Ok(HttpResponse::Ok()
.content_type("application/json")
.body(body)),
Ok((false, body)) => Ok(HttpResponse::BadRequest()
.content_type("application/json")
.body(body)),
Err(..) => Ok(HttpResponse::InternalServerError().into()),
})
.responder()
#[post("/graphql")]
pub async fn graphql(
request: HttpRequest,
payload: Payload,
state: Data<AppState>,
) -> actix_web::Result<HttpResponse> {
let context = state
.build_context()
.map_err(error::ErrorInternalServerError)?;
graphql_handler(&state.schema, &context, request, payload).await
}

pub fn graphiql<S: 'static>(_req: &HttpRequest<S>) -> HttpResponse {
let html = graphiql_source("/graphql");

HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(html)
#[get("/graphiql")]
pub async fn graphiql() -> actix_web::Result<HttpResponse> {
graphiql_handler("/graphql", None).await
}
92 changes: 39 additions & 53 deletions src/bin/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,64 @@
mod files;
mod graphql;
mod stream;
mod streaming;
pub mod temp;
mod transcoder;
mod transcoding;

#[cfg(feature = "embed_web")]
mod web;
mod web_interface;

use crate::server::graphql::{graphiql, graphql, AppState, GraphQLExecutor};
use crate::server::graphql::{graphiql, graphql as graphql_handler, AppState};
use crate::server::temp::TemporaryFiles;
use crate::server::transcoder::{TranscodeTarget, Transcoder};
use crate::server::transcoding::TranscodedHandlerAppExt;
use actix::prelude::*;
use actix::System;
use actix_web::http;
use actix_web::server;
use actix_web::App;
use crate::server::transcoder::Transcoder;
use crate::server::transcoding::transcode_handler;
use actix_web::rt::System;
use actix_web::{web, App, HttpServer};
use forte_core::context;
use forte_core::models::{create_schema, Album, Song};
use lru_disk_cache::LruDiskCache;
use std::sync::Arc;

#[cfg(feature = "embed_web")]
use web_interface::register_web_interface_handler;

#[cfg(not(feature = "embed_web"))]
use actix_web::web::ServiceConfig;
/// Do nothing when the embedded web interface is not enabled
#[cfg(not(feature = "embed_web"))]
fn register_web_interface_handler(_config: &mut ServiceConfig) {}

pub fn serve(
pool: context::Pool,
host: &str,
transcode_cache: LruDiskCache,
temp_files: TemporaryFiles,
) {
let sys = System::new("forte");

let schema = Arc::new(create_schema());
let gql_executor = SyncArbiter::start(3, move || GraphQLExecutor::new(schema.clone()));

) -> std::io::Result<()> {
let mut sys = System::new("forte");
let transcoder = Transcoder::new(transcode_cache, temp_files);
let transcoder_addr: Addr<Transcoder> = transcoder.start();

server::new(move || {
App::with_state(AppState::new(
gql_executor.clone(),
transcoder_addr.clone(),
pool.clone(),
))
.resource("/graphql", |r| r.method(http::Method::POST).with(graphql))
.resource("/graphiql", |r| r.method(http::Method::GET).f(graphiql))
.register_transcode_handler(TranscodeTarget::MP3V0)
.register_transcode_handler(TranscodeTarget::AACV5)
.resource(&Song::get_raw_stream_url("{id}"), |r| {
r.method(http::Method::GET).with(streaming::song_handler)
})
.resource(&Album::get_artwork_url("{id}"), |r| {
r.method(http::Method::GET).with(streaming::artwork_handler)
})
.register_web_interface_handler()
let server = HttpServer::new(move || {
App::new()
.data(AppState {
schema: create_schema(),
transcoder: transcoder.clone(),
connection_pool: pool.clone(),
})
.service(graphql_handler)
.service(graphiql)
.route(
&Song::get_raw_stream_url("{id}"),
web::get().to(streaming::song_handler),
)
.route(
&Album::get_artwork_url("{id}"),
web::get().to(streaming::artwork_handler),
)
.service(transcode_handler)
.configure(register_web_interface_handler)
})
.bind(host)
.unwrap()
.start();

println!("Started Server on {}", host);
.unwrap();

let _ = sys.run();
}
println!("Starting Server on {}", host);

pub trait WebHandlerAppExt {
/// Register the web interface handlers, if the embedded web interface is enabled
fn register_web_interface_handler(self) -> Self;
}

/// Do nothing when the embedded web interface is not enabled
#[cfg(not(feature = "embed_web"))]
impl WebHandlerAppExt for App<AppState> {
fn register_web_interface_handler(self) -> Self {
self
}
sys.block_on(server.run())
}
Loading

0 comments on commit 23e7400

Please sign in to comment.