Skip to content

Commit

Permalink
Move back to Elasticache
Browse files Browse the repository at this point in the history
This time we are using a cluster instead of serverless. Did not managed
to get rid of the massive (800ms) delay that the serverless cache had
when trying to initiate a connection.
  • Loading branch information
jyrihogman committed Sep 24, 2024
1 parent af35296 commit 1f43a1d
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 54 deletions.
97 changes: 97 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
exclude = ["worker-infra", "server-infra"]
exclude = ["worker-infra", "server-infra", "target"]
members = ["server", "wh-core", "worker", "message-handler"]
resolver = "2"

Expand Down
8 changes: 8 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ openssl = { version = "0.10.66", features = ["vendored"] }

wh-core = { path = "../wh-core" }

deadpool-redis = { version = "0.18.0", features = ["rt_tokio_1"] }
redis = { version = "0.27.2", default-features = false, features = [
"tls",
"tokio-native-tls-comp",
] }

tracing = "0.1.40"
deadpool = { version = "0.12.1", features = ["managed"] }
lazy_static = "1.5.0"
38 changes: 33 additions & 5 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,58 @@
use std::env::set_var;
use std::env::{self, set_var};
use std::sync::Arc;

use axum::Router;
use lambda_http::tower::ServiceBuilder;
use lambda_http::{run, Error};

use deadpool::managed::{PoolConfig, QueueMode, Timeouts};
use deadpool_redis::{Config, Pool, Runtime};

use lazy_static::lazy_static;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::rate_limit::RateLimit;
use crate::v2::handler as waterheater_calc;
use crate::v2::router::v2_routes;

mod common;
mod http;
mod middleware;
mod rate_limiter;
mod rate_limit;
mod tests;
mod v2;

lazy_static! {
static ref REDIS_POOL: Arc<Pool> = Arc::new(create_redis_pool());
}

#[derive(Clone)]
struct AppState {
pub redis_pool: Arc<Pool>,
dynamo_client: aws_sdk_dynamodb::Client,
}

fn create_redis_pool() -> Pool {
let redis_endpoint = env::var("REDIS_ENDPOINT").unwrap_or("http://localhost".into());
let redis_url = format!("redis://{}", redis_endpoint);

let cfg = Config {
connection: None,
url: Some(redis_url),
pool: Some(PoolConfig {
max_size: 10,
timeouts: Timeouts::default(),
queue_mode: QueueMode::Fifo,
}),
};

cfg.create_pool(Some(Runtime::Tokio1)).unwrap()
}

#[tokio::main]
async fn main() -> Result<(), Error> {
set_var("AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH", "true");
tracing_subscriber::fmt()
.json()
.with_max_level(tracing::Level::INFO)
Expand All @@ -37,11 +66,10 @@ async fn main() -> Result<(), Error> {
let client = aws_sdk_dynamodb::Client::new(&config);

let state = AppState {
redis_pool: REDIS_POOL.clone(),
dynamo_client: client,
};

set_var("AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH", "true");

#[derive(OpenApi)]
#[openapi(
paths(waterheater_calc::handle_enable_water_heater),
Expand All @@ -65,7 +93,7 @@ async fn main() -> Result<(), Error> {
.layer(axum::middleware::from_fn(middleware::inject_connect_info))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
rate_limiter::rate_limit,
RateLimit::rate_limit,
)),
)
.with_state(state);
Expand Down
68 changes: 36 additions & 32 deletions server/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use axum::{
middleware::Next,
response::Response,
};

use redis::aio::Connection;
use deadpool_redis::Connection;
use redis::RedisError;

use std::env;
use tracing::{error, info};

Expand All @@ -31,33 +31,29 @@ impl RateLimit {
let client_ip = Self::extract_client_ip(&request).unwrap_or("unknown");

let capacity = 20; // Maximum 20 requests allowed
let refill_rate_per_millisecond = 20.0 / 60_000.0; // Tokens per millisecond
let refill_rate = 20.0 / 60.0; // Refill rate: 20 tokens per minute

let current_time = chrono::Utc::now().timestamp_millis() as f64 / 1000.0;

let start_time = std::time::Instant::now();
let mut conn = match state.redis_pool.get_async_connection().await {
let mut conn = match state.redis_pool.get().await {
Ok(c) => c,
Err(e) => {
error!("Failed to connect to Redis: {}", e);
// Allow the request if Redis is unavailable
return Ok(next.run(request).await);
}
};
let conn_acquisition_time = start_time.elapsed();
info!(
"Redis connection acquisition time: {:?}",
conn_acquisition_time
);

let allowed = Self::check_rate_limit(
&mut conn,
client_ip,
capacity,
refill_rate_per_millisecond,
current_time,
)
.await
.unwrap_or(true);

let allowed =
match Self::check_rate_limit(&mut conn, client_ip, capacity, refill_rate, current_time)
.await
{
Ok(val) => val,
Err(e) => {
error!("Rate limit check failed: {}", e);
true
}
};

if allowed {
Ok(next.run(request).await)
Expand Down Expand Up @@ -89,25 +85,33 @@ impl RateLimit {
let script = r#"
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_time = tonumber(ARGV[2])
local refill_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local data = redis.call("GET", key)
local tokens = tonumber(data)
local data = redis.call("HMGET", key, "tokens", "last_refill")
local tokens = tonumber(data[1])
local last_refill = tonumber(data[2])
if tokens == nil then
tokens = capacity - 1
redis.call("SETEX", key, refill_time, tokens)
return 1
tokens = capacity
last_refill = current_time
end
if tokens > 0 then
local delta = current_time - last_refill
local tokens_to_add = delta * refill_rate
tokens = math.min(tokens + tokens_to_add, capacity)
last_refill = current_time
local allowed = 0
if tokens >= 1 then
allowed = 1
tokens = tokens - 1
redis.call("SET", key, tokens)
redis.call("EXPIRE", key, refill_time)
return 1
else
return 0
end
redis.call("HMSET", key, "tokens", tokens, "last_refill", last_refill)
redis.call("EXPIRE", key, 3600)
return allowed
"#;

let allowed: i32 = redis::cmd("EVAL")
Expand Down
Loading

0 comments on commit 1f43a1d

Please sign in to comment.