Skip to content

Commit

Permalink
feat(cache): re-work job results cache (#4898)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael authored Dec 11, 2024
1 parent 343cd02 commit af5cca1
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 180 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/windmill-common/src/s3_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub struct S3Object {

#[cfg(feature = "parquet")]
pub async fn get_etag_or_empty(
object_store_resource: &mut ObjectStoreResource,
object_store_resource: &ObjectStoreResource,
s3_object: S3Object,
) -> Option<String> {
let object_store_client = build_object_store_client(object_store_resource).await;
Expand Down
189 changes: 117 additions & 72 deletions backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use async_recursion::async_recursion;

use itertools::Itertools;

use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::{json, Value};
use sha2::Digest;
use sqlx::types::Json;
use sqlx::{Pool, Postgres};
use tokio::process::Command;
Expand All @@ -21,19 +22,16 @@ use windmill_common::worker::{
to_raw_value, write_file, CLOUD_HOSTED, ROOT_CACHE_DIR, WORKER_CONFIG,
};
use windmill_common::{
cache::Cache,
error::{self, Error},
jobs::QueuedJob,
scripts::ScriptHash,
variables::ContextualVariable,
};

use anyhow::{anyhow, Result};

use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
time::Duration,
};

use std::{collections::HashMap, sync::Arc, time::Duration};
use uuid::Uuid;
use windmill_common::{variables, DB};

Expand Down Expand Up @@ -615,40 +613,59 @@ pub async fn resolve_job_timeout(
}
}

pub async fn hash_args(
async fn hash_args(
_db: &DB,
_client: &AuthedClient,
_workspace_id: &str,
_job_id: &Uuid,
v: &Option<sqlx::types::Json<HashMap<String, Box<RawValue>>>>,
) -> String {
if let Some(vs) = v {
let mut dh = DefaultHasher::new();
let hm = &vs.0;
v: &Option<Json<HashMap<String, Box<RawValue>>>>,
hasher: &mut sha2::Sha256,
) {
if let Some(Json(hm)) = v {
for k in hm.keys().sorted() {
k.hash(&mut dh);
hasher.update(k.as_bytes());
let arg_value = hm.get(k).unwrap();
#[cfg(feature = "parquet")]
let (_, arg_additions) =
arg_value_hash_additions(_db, _client, _workspace_id, hm.get(k).unwrap()).await;
arg_value.get().hash(&mut dh);
hasher.update(arg_value.get().as_bytes());
#[cfg(feature = "parquet")]
for (_, arg_addition) in arg_additions {
arg_addition.hash(&mut dh);
hasher.update(arg_addition.as_bytes());
}
}
hex::encode(dh.finish().to_be_bytes())
} else {
"empty_args".to_string()
}
}

pub async fn cached_result_path(
db: &DB,
client: &AuthedClient,
job: &QueuedJob,
raw_code: Option<&String>,
raw_lock: Option<&String>,
raw_flow: Option<&Json<Box<RawValue>>>,
) -> String {
let mut hasher = sha2::Sha256::new();
hasher.update(&[job.job_kind as u8]);
if let Some(ScriptHash(hash)) = job.script_hash {
hasher.update(&hash.to_le_bytes())
} else if let None = job.script_hash {
job.script_path
.as_ref()
.inspect(|x| hasher.update(x.as_bytes()));
raw_code.inspect(|x| hasher.update(x));
raw_lock.inspect(|x| hasher.update(x));
raw_flow.inspect(|x| hasher.update(x.get()));
}
hash_args(db, client, &job.workspace_id, &job.args, &mut hasher).await;
format!("g/results/{:032x}", hasher.finalize())
}

#[cfg(feature = "parquet")]
async fn get_workspace_s3_resource_path(
db: &DB,
client: &AuthedClient,
workspace_id: &str,
storage: Option<String>,
storage: Option<&String>,
) -> windmill_common::error::Result<Option<ObjectStoreResource>> {
use windmill_common::{
job_s3_helpers_ee::get_s3_resource_internal, s3_helpers::StorageResourceType,
Expand Down Expand Up @@ -732,7 +749,7 @@ async fn arg_value_hash_additions(
let mut storage = None;
if let Ok(s3_object) = parsed_value {
let s3_resource_opt =
get_workspace_s3_resource_path(db, client, workspace_id, s3_object.storage.clone())
get_workspace_s3_resource_path(db, client, workspace_id, s3_object.storage.as_ref())
.await;
storage = s3_object.storage.clone();

Expand All @@ -751,82 +768,107 @@ struct CachedResource {
expire: i64,
#[serde(skip_serializing_if = "Option::is_none")]
s3_etags: Option<HashMap<String, String>>,
value: Box<RawValue>,
value: Arc<Box<RawValue>>,
storage: Option<String>,
}

impl CachedResource {
fn expired(&self) -> bool {
self.expire <= chrono::Utc::now().timestamp()
}
}

lazy_static! {
/// In-memory cache for resources.
static ref CACHED_RESULTS: Cache<String, Arc<CachedResource>> = Cache::new(1000);
}

pub async fn get_cached_resource_value_if_valid(
_db: &DB,
client: &AuthedClient,
_job_id: &Uuid,
_workspace_id: &str,
cached_res_path: &str,
) -> Option<Box<RawValue>> {
let resource_opt = client
.get_resource_value::<CachedResource>(cached_res_path)
) -> Option<Arc<Box<RawValue>>> {
let resource = match CACHED_RESULTS
.get_value_or_guard_async(cached_res_path)
.await
.ok();
if let Some(cached_resource) = resource_opt {
if cached_resource.expire <= chrono::Utc::now().timestamp() {
// cache expired
{
// check for cache expiration.
Ok(resource) if resource.expired() => {
let _ = CACHED_RESULTS.remove(cached_res_path);
return None;
}
#[cfg(feature = "parquet")]
// resource is in cache and not expired, return it.
Ok(resource) => resource,
// resource is not in cache, fetch it from the database.
Err(entry) => match client
.get_resource_value::<CachedResource>(cached_res_path)
.await
.ok()
{
let s3_etags = cached_resource.s3_etags.unwrap_or_default();
let object_store_resource_opt: Option<ObjectStoreResource> = if s3_etags.is_empty() {
None
} else {
get_workspace_s3_resource_path(
_db,
&client,
_workspace_id,
cached_resource.storage.clone(),
)
None => return None,
// check for cache expiration.
Some(resource) if resource.expired() => return None,
Some(resource) => {
let resource = Arc::new(resource);
let _ = entry.insert(resource.clone());
resource
}
},
};

#[cfg(feature = "parquet")]
{
let empty_etags = HashMap::new();
let s3_etags = resource.s3_etags.as_ref().unwrap_or(&empty_etags);
let object_store_resource_opt: Option<ObjectStoreResource> = if s3_etags.is_empty() {
None
} else {
get_workspace_s3_resource_path(_db, &client, _workspace_id, resource.storage.as_ref())
.await
.ok()
.flatten()
};
};

if !s3_etags.is_empty() && object_store_resource_opt.is_none() {
tracing::warn!("Cached result references s3 files that are not retrievable anymore because the workspace S3 resource can't be fetched. Cache will be invalidated");
return None;
}
for (s3_file_key, s3_file_etag) in s3_etags {
if let Some(mut object_store_resource) = object_store_resource_opt.clone() {
let etag = get_etag_or_empty(
&mut object_store_resource,
S3Object {
s3: s3_file_key.clone(),
storage: cached_resource.storage.clone(),
filename: None,
},
)
.await;
if etag.is_none() || etag.clone().unwrap() != s3_file_etag {
tracing::warn!("S3 file etag for '{}' has changed. Value from cache is {:?} while current value from S3 is {:?}. Cache will be invalidated", s3_file_key.clone(), s3_file_etag, etag);
return None;
}
if !s3_etags.is_empty() && object_store_resource_opt.is_none() {
tracing::warn!("Cached result references s3 files that are not retrievable anymore because the workspace S3 resource can't be fetched. Cache will be invalidated");
return None;
}
for (s3_file_key, s3_file_etag) in s3_etags {
if let Some(object_store_resource) = object_store_resource_opt.as_ref() {
let etag = get_etag_or_empty(
object_store_resource,
S3Object {
s3: s3_file_key.clone(),
storage: resource.storage.clone(),
filename: None,
},
)
.await;
if etag.as_ref() != Some(s3_file_etag) {
tracing::warn!("S3 file etag for '{}' has changed. Value from cache is {:?} while current value from S3 is {:?}. Cache will be invalidated", s3_file_key.clone(), s3_file_etag, etag);
return None;
}
}
}
return Some(cached_resource.value);
}
return None;

Some(resource.value.clone())
}

pub async fn save_in_cache(
db: &Pool<Postgres>,
_client: &AuthedClient,
job: &QueuedJob,
cached_path: String,
r: &Box<RawValue>,
r: Arc<Box<RawValue>>,
) {
let expire = chrono::Utc::now().timestamp() + job.cache_ttl.unwrap() as i64;

#[cfg(feature = "parquet")]
let (storage, s3_etags) =
arg_value_hash_additions(db, _client, job.workspace_id.as_str(), r).await;
arg_value_hash_additions(db, _client, job.workspace_id.as_str(), &r).await;

#[cfg(feature = "parquet")]
let s3_etags = if s3_etags.is_empty() {
Expand All @@ -838,17 +880,17 @@ pub async fn save_in_cache(
#[cfg(not(feature = "parquet"))]
let (storage, s3_etags) = (None, None);

let store_cache_resource = CachedResource { expire, s3_etags, value: r.clone(), storage };
let raw_json = sqlx::types::Json(store_cache_resource);
let store_cache_resource = CachedResource { expire, s3_etags, value: r, storage };
let raw_json = Json(&store_cache_resource);

if let Err(e) = sqlx::query!(
"INSERT INTO resource
(workspace_id, path, value, resource_type, created_by, edited_at)
VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (workspace_id, path)
DO UPDATE SET value = $3, edited_at = now()",
(workspace_id, path, value, resource_type, created_by, edited_at)
VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (workspace_id, path)
DO UPDATE SET value = $3, edited_at = now()",
job.workspace_id,
cached_path,
raw_json as sqlx::types::Json<CachedResource>,
&cached_path,
raw_json as Json<&CachedResource>,
"cache",
job.created_by
)
Expand All @@ -857,6 +899,9 @@ pub async fn save_in_cache(
{
tracing::error!("Error creating cache resource {e:#}")
}

// Cache result in-memory.
CACHED_RESULTS.insert(cached_path, Arc::new(store_cache_resource));
}

fn tentatively_improve_error(err: Error, executable: &str) -> Error {
Expand Down
7 changes: 2 additions & 5 deletions backend/windmill-worker/src/job_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::job_logger_ee::default_disk_log_storage;
#[cfg(all(feature = "enterprise", feature = "parquet"))]
use crate::job_logger_ee::s3_storage;


pub enum CompactLogs {
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
NotEE,
Expand All @@ -25,8 +24,6 @@ pub enum CompactLogs {
S3,
}



pub(crate) async fn append_job_logs(
job_id: Uuid,
w_id: String,
Expand All @@ -38,7 +35,7 @@ pub(crate) async fn append_job_logs(
) -> () {
if must_compact_logs {
#[cfg(all(feature = "enterprise", feature = "parquet"))]
s3_storage(job_id, &w_id, &db, logs, total_size, &worker_name).await;
s3_storage(job_id, &w_id, &db, &logs, &total_size, &worker_name).await;

#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
{
Expand Down Expand Up @@ -70,7 +67,7 @@ pub fn append_with_limit(dst: &mut String, src: &str, limit: &mut usize) {
if *NO_LOGS_AT_ALL {
return;
}

let src_str;
let src = {
src_str = RE_00.replace_all(src, "");
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-worker/src/job_logger_ee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn s3_storage(
_total_size: &Arc<AtomicU32>,
_worker_name: &String,
) {
tracing::info!("Logs length of {job_id} has exceeded a threshold. Implementation to store excess on s3 in not OSS");
tracing::info!("Logs length of {_job_id} has exceeded a threshold. Implementation to store excess on s3 in not OSS");
}

pub(crate) async fn default_disk_log_storage(
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-worker/src/result_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,11 @@ pub async fn process_completed_job(
worker_name: &str,
job_completed_tx: Sender<SendResult>,
#[cfg(feature = "benchmark")] bench: &mut BenchmarkIter,
) -> windmill_common::error::Result<Option<Arc<QueuedJob>>> {
) -> error::Result<Option<Arc<QueuedJob>>> {
if success {
// println!("bef completed job{:?}", SystemTime::now());
if let Some(cached_path) = cached_res_path {
save_in_cache(db, client, &job, cached_path.to_string(), &result).await;
save_in_cache(db, client, &job, cached_path, result.clone()).await;
}

let is_flow_step = job.is_flow_step;
Expand Down
Loading

0 comments on commit af5cca1

Please sign in to comment.