Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ structopt = { version = "0.3.25" }
sysinfo = "0.20.5"
thiserror = "1"
tokio-stream = "0.1.8"
tokio = { version = "1.13.1", default-features = false, features=["sync", "macros"] }
clokwerk = "0.4.0-rc1"
actix-web-static-files = "4.0"
static-files = "0.2.1"
Expand Down
125 changes: 103 additions & 22 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer};
use actix_web_httpauth::extractors::basic::BasicAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use actix_web_static_files::ResourceFiles;
use clokwerk::{AsyncScheduler, TimeUnits};
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
use log::warn;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

use std::thread;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

mod banner;
mod error;
Expand Down Expand Up @@ -63,33 +67,110 @@ async fn main() -> anyhow::Result<()> {
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
warn!("could not populate local metadata. {:?}", e);
}
thread::spawn(sync);
run_http().await?;

Ok(())
}
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();

#[actix_web::main]
async fn sync() {
let mut scheduler = AsyncScheduler::new();
scheduler
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().local_sync().await {
warn!("failed to sync local data. {:?}", e);
let app = run_http();
tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
s3sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
localsync_handler.join().unwrap_or(());
s3sync_handler.join().unwrap_or(());
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Local Sync failed"))
},
_ = &mut s3sync_outbox => {
// s3sync failed, this is recoverable by just starting s3sync thread again
s3sync_handler.join().unwrap_or(());
(s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync();
}
};
}
}

fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
let handle = thread::spawn(move || {
let res = catch_unwind(move || {
let rt = actix_web::rt::System::new();
rt.block_on(async {
let mut scheduler = AsyncScheduler::new();
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().s3_sync().await {
warn!("failed to sync local data with object store. {:?}", e);
}
});

loop {
scheduler.run_pending().await;
match AssertUnwindSafe(|| inbox_rx.try_recv())() {
Ok(_) => break,
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Closed) => {
// should be unreachable but breaking anyways
break;
}
}
}
})
});
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().s3_sync().await {
warn!("failed to sync local data with object store. {:?}", e);

if res.is_err() {
outbox_tx.send(()).unwrap();
}
});

(handle, outbox_rx, inbox_tx)
}

fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
let handle = thread::spawn(move || {
let res = catch_unwind(move || {
let mut scheduler = Scheduler::new();
scheduler
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
.run(move || {
if let Err(e) = S3::new().local_sync() {
warn!("failed to sync local data. {:?}", e);
}
});

loop {
thread::sleep(Duration::from_millis(50));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to thread sleep here for small amount of time ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sleep needed here?

scheduler.run_pending();
match AssertUnwindSafe(|| inbox_rx.try_recv())() {
Ok(_) => break,
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Closed) => {
// should be unreachable but breaking anyways
break;
}
}
}
});

loop {
scheduler.run_pending().await;
}
if res.is_err() {
outbox_tx.send(()).unwrap();
}
});

(handle, outbox_rx, inbox_tx)
}

async fn validator(
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait ObjectStorage: Sync + 'static {
query: &Query,
results: &mut Vec<RecordBatch>,
) -> Result<(), ObjectStorageError>;
async fn local_sync(&self) -> io::Result<()> {
fn local_sync(&self) -> io::Result<()> {
// If the local data path doesn't exist yet, return early.
// This method will be called again after next ticker interval
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
Expand Down