diff --git a/backend/.sqlx/query-8bd028c8b5f8a4d566f89eebc2e63fd04beaf2b0b49e07c7df42ecddd70737f3.json b/backend/.sqlx/query-8bd028c8b5f8a4d566f89eebc2e63fd04beaf2b0b49e07c7df42ecddd70737f3.json new file mode 100644 index 0000000000000..f1f7a0b56c153 --- /dev/null +++ b/backend/.sqlx/query-8bd028c8b5f8a4d566f89eebc2e63fd04beaf2b0b49e07c7df42ecddd70737f3.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO resource\n (workspace_id, path, value, resource_type, created_by, edited_at)\n VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (workspace_id, path)\n DO UPDATE SET value = $3, edited_at = now()", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar", + "Varchar", + "Jsonb", + "Varchar", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "8bd028c8b5f8a4d566f89eebc2e63fd04beaf2b0b49e07c7df42ecddd70737f3" +} diff --git a/backend/windmill-common/src/s3_helpers.rs b/backend/windmill-common/src/s3_helpers.rs index a1c5d03888440..0274d0f38d362 100644 --- a/backend/windmill-common/src/s3_helpers.rs +++ b/backend/windmill-common/src/s3_helpers.rs @@ -122,7 +122,7 @@ pub struct S3Object { #[cfg(feature = "parquet")] pub async fn get_etag_or_empty( - object_store_resource: &mut ObjectStoreResource, + object_store_resource: &ObjectStoreResource, s3_object: S3Object, ) -> Option { let object_store_client = build_object_store_client(object_store_resource).await; diff --git a/backend/windmill-worker/src/common.rs b/backend/windmill-worker/src/common.rs index f36954819dfa4..147ba1bdae0c0 100644 --- a/backend/windmill-worker/src/common.rs +++ b/backend/windmill-worker/src/common.rs @@ -1,11 +1,12 @@ use async_recursion::async_recursion; use itertools::Itertools; - +use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::{json, Value}; +use sha2::Digest; use sqlx::types::Json; use sqlx::{Pool, Postgres}; use tokio::process::Command; @@ -21,19 +22,16 @@ use windmill_common::worker::{ to_raw_value, write_file, CLOUD_HOSTED, ROOT_CACHE_DIR, WORKER_CONFIG, }; use windmill_common::{ + cache::Cache, error::{self, Error}, jobs::QueuedJob, + scripts::ScriptHash, variables::ContextualVariable, }; use anyhow::{anyhow, Result}; -use std::{ - collections::{hash_map::DefaultHasher, HashMap}, - hash::{Hash, Hasher}, - time::Duration, -}; - +use std::{collections::HashMap, sync::Arc, time::Duration}; use uuid::Uuid; use windmill_common::{variables, DB}; @@ -615,40 +613,59 @@ pub async fn resolve_job_timeout( } } -pub async fn hash_args( +async fn hash_args( _db: &DB, _client: &AuthedClient, _workspace_id: &str, - _job_id: &Uuid, - v: &Option>>>, -) -> String { - if let Some(vs) = v { - let mut dh = DefaultHasher::new(); - let hm = &vs.0; + v: &Option>>>, + hasher: &mut sha2::Sha256, +) { + if let Some(Json(hm)) = v { for k in hm.keys().sorted() { - k.hash(&mut dh); + hasher.update(k.as_bytes()); let arg_value = hm.get(k).unwrap(); #[cfg(feature = "parquet")] let (_, arg_additions) = arg_value_hash_additions(_db, _client, _workspace_id, hm.get(k).unwrap()).await; - arg_value.get().hash(&mut dh); + hasher.update(arg_value.get().as_bytes()); #[cfg(feature = "parquet")] for (_, arg_addition) in arg_additions { - arg_addition.hash(&mut dh); + hasher.update(arg_addition.as_bytes()); } } - hex::encode(dh.finish().to_be_bytes()) - } else { - "empty_args".to_string() } } +pub async fn cached_result_path( + db: &DB, + client: &AuthedClient, + job: &QueuedJob, + raw_code: Option<&String>, + raw_lock: Option<&String>, + raw_flow: Option<&Json>>, +) -> String { + let mut hasher = sha2::Sha256::new(); + hasher.update(&[job.job_kind as u8]); + if let Some(ScriptHash(hash)) = job.script_hash { + hasher.update(&hash.to_le_bytes()) + } else if let None = job.script_hash { + job.script_path + .as_ref() + .inspect(|x| hasher.update(x.as_bytes())); + raw_code.inspect(|x| hasher.update(x)); + raw_lock.inspect(|x| hasher.update(x)); + raw_flow.inspect(|x| hasher.update(x.get())); + } + hash_args(db, client, &job.workspace_id, &job.args, &mut hasher).await; + format!("g/results/{:032x}", hasher.finalize()) +} + #[cfg(feature = "parquet")] async fn get_workspace_s3_resource_path( db: &DB, client: &AuthedClient, workspace_id: &str, - storage: Option, + storage: Option<&String>, ) -> windmill_common::error::Result> { use windmill_common::{ job_s3_helpers_ee::get_s3_resource_internal, s3_helpers::StorageResourceType, @@ -732,7 +749,7 @@ async fn arg_value_hash_additions( let mut storage = None; if let Ok(s3_object) = parsed_value { let s3_resource_opt = - get_workspace_s3_resource_path(db, client, workspace_id, s3_object.storage.clone()) + get_workspace_s3_resource_path(db, client, workspace_id, s3_object.storage.as_ref()) .await; storage = s3_object.storage.clone(); @@ -751,68 +768,93 @@ struct CachedResource { expire: i64, #[serde(skip_serializing_if = "Option::is_none")] s3_etags: Option>, - value: Box, + value: Arc>, storage: Option, } +impl CachedResource { + fn expired(&self) -> bool { + self.expire <= chrono::Utc::now().timestamp() + } +} + +lazy_static! { + /// In-memory cache for resources. + static ref CACHED_RESULTS: Cache> = Cache::new(1000); +} + pub async fn get_cached_resource_value_if_valid( _db: &DB, client: &AuthedClient, _job_id: &Uuid, _workspace_id: &str, cached_res_path: &str, -) -> Option> { - let resource_opt = client - .get_resource_value::(cached_res_path) +) -> Option>> { + let resource = match CACHED_RESULTS + .get_value_or_guard_async(cached_res_path) .await - .ok(); - if let Some(cached_resource) = resource_opt { - if cached_resource.expire <= chrono::Utc::now().timestamp() { - // cache expired + { + // check for cache expiration. + Ok(resource) if resource.expired() => { + let _ = CACHED_RESULTS.remove(cached_res_path); return None; } - #[cfg(feature = "parquet")] + // resource is in cache and not expired, return it. + Ok(resource) => resource, + // resource is not in cache, fetch it from the database. + Err(entry) => match client + .get_resource_value::(cached_res_path) + .await + .ok() { - let s3_etags = cached_resource.s3_etags.unwrap_or_default(); - let object_store_resource_opt: Option = if s3_etags.is_empty() { - None - } else { - get_workspace_s3_resource_path( - _db, - &client, - _workspace_id, - cached_resource.storage.clone(), - ) + None => return None, + // check for cache expiration. + Some(resource) if resource.expired() => return None, + Some(resource) => { + let resource = Arc::new(resource); + let _ = entry.insert(resource.clone()); + resource + } + }, + }; + + #[cfg(feature = "parquet")] + { + let empty_etags = HashMap::new(); + let s3_etags = resource.s3_etags.as_ref().unwrap_or(&empty_etags); + let object_store_resource_opt: Option = if s3_etags.is_empty() { + None + } else { + get_workspace_s3_resource_path(_db, &client, _workspace_id, resource.storage.as_ref()) .await .ok() .flatten() - }; + }; - if !s3_etags.is_empty() && object_store_resource_opt.is_none() { - tracing::warn!("Cached result references s3 files that are not retrievable anymore because the workspace S3 resource can't be fetched. Cache will be invalidated"); - return None; - } - for (s3_file_key, s3_file_etag) in s3_etags { - if let Some(mut object_store_resource) = object_store_resource_opt.clone() { - let etag = get_etag_or_empty( - &mut object_store_resource, - S3Object { - s3: s3_file_key.clone(), - storage: cached_resource.storage.clone(), - filename: None, - }, - ) - .await; - if etag.is_none() || etag.clone().unwrap() != s3_file_etag { - tracing::warn!("S3 file etag for '{}' has changed. Value from cache is {:?} while current value from S3 is {:?}. Cache will be invalidated", s3_file_key.clone(), s3_file_etag, etag); - return None; - } + if !s3_etags.is_empty() && object_store_resource_opt.is_none() { + tracing::warn!("Cached result references s3 files that are not retrievable anymore because the workspace S3 resource can't be fetched. Cache will be invalidated"); + return None; + } + for (s3_file_key, s3_file_etag) in s3_etags { + if let Some(object_store_resource) = object_store_resource_opt.as_ref() { + let etag = get_etag_or_empty( + object_store_resource, + S3Object { + s3: s3_file_key.clone(), + storage: resource.storage.clone(), + filename: None, + }, + ) + .await; + if etag.as_ref() != Some(s3_file_etag) { + tracing::warn!("S3 file etag for '{}' has changed. Value from cache is {:?} while current value from S3 is {:?}. Cache will be invalidated", s3_file_key.clone(), s3_file_etag, etag); + return None; } } } - return Some(cached_resource.value); } - return None; + + Some(resource.value.clone()) } pub async fn save_in_cache( @@ -820,13 +862,13 @@ pub async fn save_in_cache( _client: &AuthedClient, job: &QueuedJob, cached_path: String, - r: &Box, + r: Arc>, ) { let expire = chrono::Utc::now().timestamp() + job.cache_ttl.unwrap() as i64; #[cfg(feature = "parquet")] let (storage, s3_etags) = - arg_value_hash_additions(db, _client, job.workspace_id.as_str(), r).await; + arg_value_hash_additions(db, _client, job.workspace_id.as_str(), &r).await; #[cfg(feature = "parquet")] let s3_etags = if s3_etags.is_empty() { @@ -838,17 +880,17 @@ pub async fn save_in_cache( #[cfg(not(feature = "parquet"))] let (storage, s3_etags) = (None, None); - let store_cache_resource = CachedResource { expire, s3_etags, value: r.clone(), storage }; - let raw_json = sqlx::types::Json(store_cache_resource); + let store_cache_resource = CachedResource { expire, s3_etags, value: r, storage }; + let raw_json = Json(&store_cache_resource); if let Err(e) = sqlx::query!( "INSERT INTO resource - (workspace_id, path, value, resource_type, created_by, edited_at) - VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (workspace_id, path) - DO UPDATE SET value = $3, edited_at = now()", + (workspace_id, path, value, resource_type, created_by, edited_at) + VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (workspace_id, path) + DO UPDATE SET value = $3, edited_at = now()", job.workspace_id, - cached_path, - raw_json as sqlx::types::Json, + &cached_path, + raw_json as Json<&CachedResource>, "cache", job.created_by ) @@ -857,6 +899,9 @@ pub async fn save_in_cache( { tracing::error!("Error creating cache resource {e:#}") } + + // Cache result in-memory. + CACHED_RESULTS.insert(cached_path, Arc::new(store_cache_resource)); } fn tentatively_improve_error(err: Error, executable: &str) -> Error { diff --git a/backend/windmill-worker/src/job_logger.rs b/backend/windmill-worker/src/job_logger.rs index f92ee165368e2..5b3ef57bea7a7 100644 --- a/backend/windmill-worker/src/job_logger.rs +++ b/backend/windmill-worker/src/job_logger.rs @@ -15,7 +15,6 @@ use crate::job_logger_ee::default_disk_log_storage; #[cfg(all(feature = "enterprise", feature = "parquet"))] use crate::job_logger_ee::s3_storage; - pub enum CompactLogs { #[cfg(not(all(feature = "enterprise", feature = "parquet")))] NotEE, @@ -25,8 +24,6 @@ pub enum CompactLogs { S3, } - - pub(crate) async fn append_job_logs( job_id: Uuid, w_id: String, @@ -38,7 +35,7 @@ pub(crate) async fn append_job_logs( ) -> () { if must_compact_logs { #[cfg(all(feature = "enterprise", feature = "parquet"))] - s3_storage(job_id, &w_id, &db, logs, total_size, &worker_name).await; + s3_storage(job_id, &w_id, &db, &logs, &total_size, &worker_name).await; #[cfg(not(all(feature = "enterprise", feature = "parquet")))] { @@ -70,7 +67,7 @@ pub fn append_with_limit(dst: &mut String, src: &str, limit: &mut usize) { if *NO_LOGS_AT_ALL { return; } - + let src_str; let src = { src_str = RE_00.replace_all(src, ""); diff --git a/backend/windmill-worker/src/job_logger_ee.rs b/backend/windmill-worker/src/job_logger_ee.rs index 310419e3d10b8..f09faf53b5926 100644 --- a/backend/windmill-worker/src/job_logger_ee.rs +++ b/backend/windmill-worker/src/job_logger_ee.rs @@ -16,7 +16,7 @@ pub(crate) async fn s3_storage( _total_size: &Arc, _worker_name: &String, ) { - tracing::info!("Logs length of {job_id} has exceeded a threshold. Implementation to store excess on s3 in not OSS"); + tracing::info!("Logs length of {_job_id} has exceeded a threshold. Implementation to store excess on s3 in not OSS"); } pub(crate) async fn default_disk_log_storage( diff --git a/backend/windmill-worker/src/result_processor.rs b/backend/windmill-worker/src/result_processor.rs index 69684eb271633..031cff0544ea4 100644 --- a/backend/windmill-worker/src/result_processor.rs +++ b/backend/windmill-worker/src/result_processor.rs @@ -446,11 +446,11 @@ pub async fn process_completed_job( worker_name: &str, job_completed_tx: Sender, #[cfg(feature = "benchmark")] bench: &mut BenchmarkIter, -) -> windmill_common::error::Result>> { +) -> error::Result>> { if success { // println!("bef completed job{:?}", SystemTime::now()); if let Some(cached_path) = cached_res_path { - save_in_cache(db, client, &job, cached_path.to_string(), &result).await; + save_in_cache(db, client, &job, cached_path, result.clone()).await; } let is_flow_step = job.is_flow_step; diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index 1d6dcc37d4335..3f51ae540f110 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -38,9 +38,8 @@ use reqwest::Response; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sqlx::{types::Json, Pool, Postgres}; use std::{ - collections::{hash_map::DefaultHasher, HashMap}, + collections::HashMap, fs::DirBuilder, - hash::Hash, sync::{ atomic::{AtomicBool, AtomicU16, Ordering}, Arc, @@ -95,8 +94,8 @@ use crate::{ bash_executor::{handle_bash_job, handle_powershell_job}, bun_executor::handle_bun_job, common::{ - build_args_map, get_cached_resource_value_if_valid, get_reserved_variables, hash_args, - update_worker_ping_for_failed_init_script, OccupancyMetrics, + build_args_map, cached_result_path, get_cached_resource_value_if_valid, + get_reserved_variables, update_worker_ping_for_failed_init_script, OccupancyMetrics, }, deno_executor::handle_deno_job, go_executor::handle_go_job, @@ -111,7 +110,7 @@ use crate::{ python_executor::handle_python_job, result_processor::{process_result, start_background_processor}, rust_executor::handle_rust_job, - worker_flow::{handle_flow, update_flow_status_in_progress, Step}, + worker_flow::{handle_flow, update_flow_status_in_progress}, worker_lockfiles::{ handle_app_dependency_job, handle_dependency_job, handle_flow_dependency_job, }, @@ -1923,8 +1922,8 @@ async fn handle_queued_job( } } - let step = if job.is_flow_step { - let r = update_flow_status_in_progress( + if job.is_flow_step { + let _ = update_flow_status_in_progress( db, &job.workspace_id, job.parent_job @@ -1933,24 +1932,19 @@ async fn handle_queued_job( ) .warn_after_seconds(5) .await?; - - Some(r) - } else { - if let Some(parent_job) = job.parent_job { - if let Err(e) = sqlx::query_scalar!( - "UPDATE queue SET flow_status = jsonb_set(jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], COALESCE(flow_status->$1, '{}'::jsonb)), array[$1, 'started_at'], to_jsonb(now()::text)) WHERE id = $2 AND workspace_id = $3", - &job.id.to_string(), - parent_job, - &job.workspace_id - ) - .execute(db) - .warn_after_seconds(5) - .await { - tracing::error!("Could not update parent job started_at flow_status: {}", e); - } + } else if let Some(parent_job) = job.parent_job { + if let Err(e) = sqlx::query_scalar!( + "UPDATE queue SET flow_status = jsonb_set(jsonb_set(COALESCE(flow_status, '{}'::jsonb), array[$1], COALESCE(flow_status->$1, '{}'::jsonb)), array[$1, 'started_at'], to_jsonb(now()::text)) WHERE id = $2 AND workspace_id = $3", + &job.id.to_string(), + parent_job, + &job.workspace_id + ) + .execute(db) + .warn_after_seconds(5) + .await { + tracing::error!("Could not update parent job started_at flow_status: {}", e); } - None - }; + } let started = Instant::now(); let (raw_code, raw_lock, raw_flow) = match (raw_code, raw_lock, raw_flow) { @@ -1969,68 +1963,25 @@ async fn handle_queued_job( }; let cached_res_path = if job.cache_ttl.is_some() { - let version_hash = if let Some(h) = job.script_hash { - if matches!(job.job_kind, JobKind::FlowScript) { - format!("flowscript_{}", h.to_string()) - } else { - format!("script_{}", h.to_string()) - } - } else if let Some(rc) = raw_code.as_ref() { - use std::hash::Hasher; - let mut s = DefaultHasher::new(); - rc.hash(&mut s); - format!("inline_{}", hex::encode(s.finish().to_be_bytes())) - } else if let Some(sqlx::types::Json(rc)) = raw_flow.as_ref() { - use std::hash::Hasher; - let mut s = DefaultHasher::new(); - rc.get().hash(&mut s); - format!("flow_{}", hex::encode(s.finish().to_be_bytes())) - } else { - "none".to_string() - }; - let args_hash = hash_args( - db, - &client.get_authed().await, - &job.workspace_id, - &job.id, - &job.args, - ) - .await; - if job.is_flow_step && !matches!(job.job_kind, JobKind::Flow) { - let flow_path = sqlx::query_scalar!( - "SELECT script_path FROM queue WHERE id = $1", - &job.parent_job.unwrap() + Some( + cached_result_path( + db, + &client.get_authed().await, + &job, + raw_code.as_ref(), + raw_lock.as_ref(), + raw_flow.as_ref(), ) - .fetch_one(db) - .warn_after_seconds(5) - .await - .map_err(|e| { - Error::InternalErr(format!( - "Fetching script path from queue for caching purposes: {e:#}" - )) - })? - .ok_or_else(|| Error::InternalErr(format!("Expected script_path")))?; - let step = match step.unwrap() { - Step::Step(i) => i.to_string(), - Step::PreprocessorStep => "preprocessor".to_string(), - Step::FailureStep => "failure".to_string(), - }; - Some(format!( - "{flow_path}/cache/{version_hash}/{step}/{args_hash}" - )) - } else if let Some(script_path) = &job.script_path { - Some(format!("{script_path}/cache/{version_hash}/{args_hash}")) - } else { - None - } + .await, + ) } else { None }; - if let Some(cached_res_path) = cached_res_path.clone() { + if let Some(cached_res_path) = cached_res_path.as_ref() { let authed_client = client.get_authed().await; - let cached_resource_value_maybe = get_cached_resource_value_if_valid( + let cached_result_maybe = get_cached_resource_value_if_valid( db, &authed_client, &job.id, @@ -2039,7 +1990,7 @@ async fn handle_queued_job( ) .warn_after_seconds(5) .await; - if let Some(cached_resource_value) = cached_resource_value_maybe { + if let Some(result) = cached_result_maybe { { let logs = "Job skipped because args & path found in cache and not expired".to_string(); @@ -2047,8 +1998,8 @@ async fn handle_queued_job( } job_completed_tx .send(JobCompleted { - job: job, - result: Arc::new(cached_resource_value), + job, + result, mem_peak: 0, canceled_by: None, success: true, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index d7bdaed7fd5a6..17c1e58859dc1 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -6,13 +6,12 @@ * LICENSE-AGPL for a copy of the license. */ -use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use crate::common::{hash_args, save_in_cache}; +use crate::common::{cached_result_path, save_in_cache}; use crate::js_eval::{eval_timeout, IdContext}; use crate::{ AuthedClient, PreviousResult, SameWorkerPayload, SameWorkerSender, SendResult, JOB_TOKEN, @@ -1067,22 +1066,10 @@ pub async fn update_flow_status_after_job_completion_internal( .await?; } else { if flow_job.cache_ttl.is_some() && success { - let cached_res_path = { - let args_hash = - hash_args(db, client, w_id, job_id_for_status, &flow_job.args).await; - let flow_path = flow_job.script_path(); - let version_hash = if let Some(sqlx::types::Json(s)) = raw_flow.as_ref() { - use std::hash::{Hash, Hasher}; - let mut h = DefaultHasher::new(); - s.get().hash(&mut h); - format!("flow_{}", hex::encode(h.finish().to_be_bytes())) - } else { - "flow_unknown".to_string() - }; - format!("{flow_path}/cache/{version_hash}/{args_hash}") - }; + let cached_res_path = + cached_result_path(db, client, &flow_job, None, None, raw_flow.as_ref()).await; - save_in_cache(db, client, &flow_job, cached_res_path, &nresult).await; + save_in_cache(db, client, &flow_job, cached_res_path, nresult.clone()).await; } fn result_has_recover_true(nresult: Arc>) -> bool { let recover = serde_json::from_str::(nresult.get());