Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ WORKDIR /parseable
COPY --from=builder /bin/sh /bin/sh
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitisht any specific reason for adding shell. Is it because image is distroless, i have been using Dockerfile.debug, so just want to confirm ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because distroless doesn't have any other shell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but do we need a shell ? I mean wasn't shell for ${hostname}, container and app PID should be same.
I'lll check once if needed. If you see CMD will trigger the command itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can remove in that case

COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable

CMD ["parseable"]
CMD ["/usr/bin/parseable"]
2 changes: 1 addition & 1 deletion Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ WORKDIR /parseable

COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable

CMD ["parseable"]
CMD ["/usr/bin/parseable"]
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
argon2 = "0.5.0"
async-trait = "0.1"
base64 = "0.22.0"
lazy_static = "1.4"
bytes = "1.4"
byteorder = "1.4.3"
bzip2 = { version = "*", features = ["static"] }
Expand Down
84 changes: 62 additions & 22 deletions server/src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,74 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use crate::option::CONFIG;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{oneshot, Mutex};
use tokio::time::{sleep, Duration};

use crate::option::CONFIG;
// Create a global variable to store signal status
lazy_static! {
static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}

pub async fn liveness() -> HttpResponse {
HttpResponse::new(StatusCode::OK)
}

pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
let signal_received = SIGNAL_RECEIVED.clone();

let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler");
log::info!("Signal handler task started");

// Block until SIGTERM is received
match sigterm.recv().await {
Some(_) => {
log::info!("Received SIGTERM signal at Readiness Probe Handler");

// Set the shutdown flag to true
let mut shutdown_flag = signal_received.lock().await;
*shutdown_flag = true;

// Trigger graceful shutdown
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
let _ = shutdown_sender.send(());
}

// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
let _ = sleep(Duration::from_secs(20)).await;

// Sync to local
crate::event::STREAM_WRITERS.unset_all();

// Sync to S3
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
log::warn!("Failed to sync local data with object store. {:?}", e);
}

log::info!("Local and S3 Sync done, handler SIGTERM completed.");
}
None => {
println!("Signal handler received None, indicating an error or end of stream");
}
}

eprintln!("Signal handler task completed");
}

pub async fn readiness() -> HttpResponse {
if CONFIG.storage().get_object_store().check().await.is_ok() {
return HttpResponse::new(StatusCode::OK);
// Check if the application has received a shutdown signal
let shutdown_flag = SIGNAL_RECEIVED.lock().await;
if *shutdown_flag {
return HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE);
}

HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
// Check the object store connection
if CONFIG.storage().get_object_store().check().await.is_ok() {
HttpResponse::new(StatusCode::OK)
} else {
HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
}
}
46 changes: 40 additions & 6 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http::health_check;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::RouteExt;
Expand All @@ -35,6 +36,8 @@ use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use std::sync::Arc;

use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::IngestorMetadata;
Expand All @@ -56,6 +59,7 @@ use bytes::Bytes;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::{oneshot, Mutex};

/// ! have to use a guard before using it
pub static INGESTOR_META: Lazy<IngestorMetadata> =
Expand Down Expand Up @@ -91,17 +95,47 @@ impl ParseableServer for IngestServer {
.wrap(cross_origin_config())
};

// concurrent workers equal to number of logical cores
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
});

if let Some(config) = ssl {
// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
.await?;
} else {
http_server.bind(&CONFIG.parseable.address)?.run().await?;
}
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;

Ok(())
}
Expand Down
45 changes: 39 additions & 6 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::health_check;
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
Expand All @@ -32,6 +33,7 @@ use actix_web::web::ServiceConfig;
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};

use crate::option::CONFIG;

Expand Down Expand Up @@ -74,16 +76,47 @@ impl ParseableServer for QueryServer {
.wrap(cross_origin_config())
};

// concurrent workers equal to number of cores on the cpu
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
if let Some(config) = ssl {
// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
.await?;
} else {
http_server.bind(&CONFIG.parseable.address)?.run().await?;
}
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;

Ok(())
}
Expand Down
46 changes: 40 additions & 6 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};

use actix_web::web::resource;
use actix_web::Resource;
Expand Down Expand Up @@ -96,16 +97,47 @@ impl ParseableServer for Server {
&CONFIG.parseable.tls_key_path,
)?;

// concurrent workers equal to number of cores on the cpu
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
if let Some(config) = ssl {
// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);

// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
.await?;
} else {
http_server.bind(&CONFIG.parseable.address)?.run().await?;
}
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

tokio::spawn(async move {
// Wait for the shutdown signal
shutdown_rx.await.ok();

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the server to run and handle shutdown
srv.await?;

Ok(())
}
Expand Down Expand Up @@ -548,6 +580,7 @@ impl Server {
let app = self.start(prometheus, CONFIG.parseable.openid.clone());

tokio::pin!(app);

loop {
tokio::select! {
e = &mut app => {
Expand All @@ -574,6 +607,7 @@ impl Server {
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}

};
}
}
Expand Down