diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 5cbdbbc39d..5d65e75d7e 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -189,7 +189,7 @@ pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_jobs(context) + job::perform_imap_jobs(context) } #[no_mangle] @@ -197,7 +197,7 @@ pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_fetch(context) + job::perform_imap_fetch(context) } #[no_mangle] @@ -205,7 +205,7 @@ pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_imap_idle(context) + job::perform_imap_idle(context) } #[no_mangle] @@ -213,7 +213,7 @@ pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_imap_idle(context) + job::interrupt_imap_idle(context) } #[no_mangle] @@ -221,7 +221,7 @@ pub unsafe extern "C" fn dc_perform_mvbox_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_mvbox_fetch(context) + job::perform_mvbox_fetch(context) } #[no_mangle] @@ -229,7 +229,7 @@ pub unsafe extern "C" fn dc_perform_mvbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_mvbox_idle(context) + job::perform_mvbox_idle(context) } #[no_mangle] @@ -237,7 +237,7 @@ pub unsafe extern "C" fn dc_interrupt_mvbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_mvbox_idle(context) + job::interrupt_mvbox_idle(context) } #[no_mangle] @@ -245,7 +245,7 @@ pub unsafe extern "C" fn dc_perform_sentbox_fetch(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_sentbox_fetch(context) + job::perform_sentbox_fetch(context) } #[no_mangle] @@ -253,7 +253,7 @@ pub unsafe extern "C" fn dc_perform_sentbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_sentbox_idle(context) + job::perform_sentbox_idle(context) } #[no_mangle] @@ -261,7 +261,7 @@ pub unsafe extern "C" fn dc_interrupt_sentbox_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_sentbox_idle(context) + job::interrupt_sentbox_idle(context) } #[no_mangle] @@ -269,7 +269,7 @@ pub unsafe extern "C" fn dc_perform_smtp_jobs(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_smtp_jobs(context) + job::perform_smtp_jobs(context) } #[no_mangle] @@ -277,7 +277,7 @@ pub unsafe extern "C" fn dc_perform_smtp_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_perform_smtp_idle(context) + job::perform_smtp_idle(context) } #[no_mangle] @@ -285,7 +285,7 @@ pub unsafe extern "C" fn dc_interrupt_smtp_idle(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_interrupt_smtp_idle(context) + job::interrupt_smtp_idle(context) } #[no_mangle] @@ -293,7 +293,7 @@ pub unsafe extern "C" fn dc_maybe_network(context: *mut dc_context_t) { assert!(!context.is_null()); let context = &*context; - dc_job::dc_maybe_network(context) + job::maybe_network(context) } #[no_mangle] @@ -1034,7 +1034,7 @@ pub unsafe extern "C" fn dc_send_locations_to_chat( assert!(!context.is_null()); let context = &*context; - dc_location::dc_send_locations_to_chat(context, chat_id, seconds) + dc_location::dc_send_locations_to_chat(context, chat_id, seconds as i64) } #[no_mangle] diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 3eaaf163e9..d1454e0e39 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -9,11 +9,11 @@ use deltachat::contact::*; use deltachat::context::*; use deltachat::dc_configure::*; use deltachat::dc_imex::*; -use deltachat::dc_job::*; use deltachat::dc_location::*; use deltachat::dc_msg::*; use deltachat::dc_receive_imf::*; use deltachat::dc_tools::*; +use deltachat::job::*; use deltachat::lot::LotState; use deltachat::peerstate::*; use deltachat::qr::*; @@ -581,7 +581,7 @@ pub unsafe fn dc_cmdline(context: &Context, line: &str) -> Result<(), failure::E println!("{}", to_string(dc_get_info(context))); } "maybenetwork" => { - dc_maybe_network(context); + maybe_network(context); } "housekeeping" => { sql::housekeeping(context); diff --git a/examples/repl/main.rs b/examples/repl/main.rs index 59648cf72c..358ffeaf48 100644 --- a/examples/repl/main.rs +++ b/examples/repl/main.rs @@ -21,9 +21,9 @@ use deltachat::config; use deltachat::constants::*; use deltachat::context::*; use deltachat::dc_configure::*; -use deltachat::dc_job::*; use deltachat::dc_securejoin::*; use deltachat::dc_tools::*; +use deltachat::job::*; use deltachat::oauth2::*; use deltachat::types::*; use deltachat::x::*; @@ -172,13 +172,11 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_imap = std::thread::spawn(move || loop { while_running!({ - unsafe { - dc_perform_imap_jobs(&ctx.read().unwrap()); - dc_perform_imap_fetch(&ctx.read().unwrap()); - } + perform_imap_jobs(&ctx.read().unwrap()); + perform_imap_fetch(&ctx.read().unwrap()); while_running!({ let context = ctx.read().unwrap(); - dc_perform_imap_idle(&context); + perform_imap_idle(&context); }); }); }); @@ -186,9 +184,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_mvbox = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_mvbox_fetch(&ctx.read().unwrap()) }; + perform_mvbox_fetch(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_mvbox_idle(&ctx.read().unwrap()) }; + perform_mvbox_idle(&ctx.read().unwrap()); }); }); }); @@ -196,9 +194,9 @@ fn start_threads(c: Arc>) { let ctx = c.clone(); let handle_sentbox = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_sentbox_fetch(&ctx.read().unwrap()) }; + perform_sentbox_fetch(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_sentbox_idle(&ctx.read().unwrap()) }; + perform_sentbox_idle(&ctx.read().unwrap()); }); }); }); @@ -206,9 +204,9 @@ fn start_threads(c: Arc>) { let ctx = c; let handle_smtp = std::thread::spawn(move || loop { while_running!({ - unsafe { dc_perform_smtp_jobs(&ctx.read().unwrap()) }; + perform_smtp_jobs(&ctx.read().unwrap()); while_running!({ - unsafe { dc_perform_smtp_idle(&ctx.read().unwrap()) }; + perform_smtp_idle(&ctx.read().unwrap()); }); }); }); @@ -226,12 +224,10 @@ fn stop_threads(context: &Context) { println!("Stopping threads"); IS_RUNNING.store(false, Ordering::Relaxed); - unsafe { - dc_interrupt_imap_idle(context); - dc_interrupt_mvbox_idle(context); - dc_interrupt_sentbox_idle(context); - dc_interrupt_smtp_idle(context); - } + interrupt_imap_idle(context); + interrupt_mvbox_idle(context); + interrupt_sentbox_idle(context); + interrupt_smtp_idle(context); handle.handle_imap.take().unwrap().join().unwrap(); handle.handle_mvbox.take().unwrap().join().unwrap(); @@ -487,14 +483,14 @@ unsafe fn handle_cmd(line: &str, ctx: Arc>) -> Result { if HANDLE.clone().lock().unwrap().is_some() { println!("imap-jobs are already running in a thread."); } else { - dc_perform_imap_jobs(&ctx.read().unwrap()); + perform_imap_jobs(&ctx.read().unwrap()); } } "configure" => { diff --git a/examples/simple.rs b/examples/simple.rs index 9b39fbfdf1..9db186533b 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -12,9 +12,8 @@ use deltachat::constants::Event; use deltachat::contact::*; use deltachat::context::*; use deltachat::dc_configure::*; -use deltachat::dc_job::{ - dc_perform_imap_fetch, dc_perform_imap_idle, dc_perform_imap_jobs, dc_perform_smtp_idle, - dc_perform_smtp_jobs, +use deltachat::job::{ + perform_imap_fetch, perform_imap_idle, perform_imap_jobs, perform_smtp_idle, perform_smtp_jobs, }; extern "C" fn cb(_ctx: &Context, event: Event, data1: usize, data2: usize) -> usize { @@ -52,12 +51,12 @@ fn main() { let r1 = running.clone(); let t1 = thread::spawn(move || { while *r1.read().unwrap() { - dc_perform_imap_jobs(&ctx1); + perform_imap_jobs(&ctx1); if *r1.read().unwrap() { - dc_perform_imap_fetch(&ctx1); + perform_imap_fetch(&ctx1); if *r1.read().unwrap() { - dc_perform_imap_idle(&ctx1); + perform_imap_idle(&ctx1); } } } @@ -67,9 +66,9 @@ fn main() { let r1 = running.clone(); let t2 = thread::spawn(move || { while *r1.read().unwrap() { - dc_perform_smtp_jobs(&ctx1); + perform_smtp_jobs(&ctx1); if *r1.read().unwrap() { - dc_perform_smtp_idle(&ctx1); + perform_smtp_idle(&ctx1); } } }); @@ -123,8 +122,8 @@ fn main() { println!("stopping threads"); *running.clone().write().unwrap() = false; - deltachat::dc_job::dc_interrupt_imap_idle(&ctx); - deltachat::dc_job::dc_interrupt_smtp_idle(&ctx); + deltachat::job::interrupt_imap_idle(&ctx); + deltachat::job::interrupt_smtp_idle(&ctx); println!("joining"); t1.join().unwrap(); diff --git a/src/chat.rs b/src/chat.rs index 61d85aa750..a86d988cd5 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -5,10 +5,10 @@ use crate::chatlist::*; use crate::constants::*; use crate::contact::*; use crate::context::Context; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; use crate::error::Error; +use crate::job::*; use crate::param::*; use crate::sql::{self, Sql}; use crate::stock::StockMessage; @@ -898,7 +898,7 @@ pub unsafe fn send_msg<'a>( } ensure!( - dc_job_send_msg(context, (*msg).id) != 0, + job_send_msg(context, (*msg).id) != 0, "Failed to initiate send job" ); @@ -1339,8 +1339,8 @@ pub fn delete(context: &Context, chat_id: u32) -> Result<(), Error> { context.call_cb(Event::MSGS_CHANGED, 0 as uintptr_t, 0 as uintptr_t); - dc_job_kill_action(context, 105); - unsafe { dc_job_add(context, 105, 0, Params::new(), 10) }; + job_kill_action(context, Action::Housekeeping); + job_add(context, Action::Housekeeping, 0, Params::new(), 10); Ok(()) } @@ -1927,7 +1927,7 @@ pub unsafe fn forward_msgs( new_msg_id = chat .prepare_msg_raw(context, msg, fresh10) .unwrap_or_default(); - dc_job_send_msg(context, new_msg_id); + job_send_msg(context, new_msg_id); } created_db_entries.push(chat_id); created_db_entries.push(new_msg_id); diff --git a/src/config.rs b/src/config.rs index 5219f9e546..0d23f11015 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,9 +3,9 @@ use strum_macros::{AsRefStr, Display, EnumIter, EnumProperty, EnumString}; use crate::constants::DC_VERSION_STR; use crate::context::Context; -use crate::dc_job::*; use crate::dc_tools::*; use crate::error::Error; +use crate::job::*; use crate::stock::StockMessage; /// The available configuration keys. @@ -102,17 +102,17 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_imap_idle(self) }; + interrupt_imap_idle(self); ret } Config::SentboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_sentbox_idle(self) }; + interrupt_sentbox_idle(self); ret } Config::MvboxWatch => { let ret = self.sql.set_config(self, key, value); - unsafe { dc_interrupt_mvbox_idle(self) }; + interrupt_mvbox_idle(self); ret } Config::Selfstatus => { diff --git a/src/contact.rs b/src/contact.rs index 8bb9dee957..ff1a399010 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -911,7 +911,6 @@ fn get_first_name<'a>(full_name: &'a str) -> &'a str { /// Returns false if addr is an invalid address, otherwise true. pub fn may_be_valid_addr(addr: &str) -> bool { let res = addr.parse::(); - println!("{:?}", res); res.is_ok() } diff --git a/src/context.rs b/src/context.rs index 60212464f5..c18e972b4c 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,14 +3,14 @@ use std::sync::{Arc, Condvar, Mutex, RwLock}; use crate::chat::*; use crate::constants::*; use crate::contact::*; -use crate::dc_job::*; -use crate::dc_jobthread::*; use crate::dc_loginparam::*; use crate::dc_move::*; use crate::dc_msg::*; use crate::dc_receive_imf::*; use crate::dc_tools::*; use crate::imap::*; +use crate::job::*; +use crate::job_thread::JobThread; use crate::key::*; use crate::lot::Lot; use crate::param::Params; @@ -30,8 +30,8 @@ pub struct Context { pub inbox: Arc>, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, - pub sentbox_thread: Arc>, - pub mvbox_thread: Arc>, + pub sentbox_thread: Arc>, + pub mvbox_thread: Arc>, pub smtp: Arc>, pub smtp_state: Arc<(Mutex, Condvar)>, pub oauth2_critical: Arc>, @@ -143,7 +143,7 @@ pub fn dc_context_new( bob: Arc::new(RwLock::new(Default::default())), last_smeared_timestamp: Arc::new(RwLock::new(0)), cmdline_sel_chat_id: Arc::new(RwLock::new(0)), - sentbox_thread: Arc::new(RwLock::new(dc_jobthread_init( + sentbox_thread: Arc::new(RwLock::new(JobThread::new( "SENTBOX", "configured_sentbox_folder", Imap::new( @@ -153,7 +153,7 @@ pub fn dc_context_new( cb_receive_imf, ), ))), - mvbox_thread: Arc::new(RwLock::new(dc_jobthread_init( + mvbox_thread: Arc::new(RwLock::new(JobThread::new( "MVBOX", "configured_mvbox_folder", Imap::new( @@ -230,7 +230,13 @@ unsafe fn cb_precheck_imf( } dc_do_heuristics_moves(context, server_folder, msg_id); if 0 != mark_seen { - dc_job_add(context, 130, msg_id as libc::c_int, Params::new(), 0); + job_add( + context, + Action::MarkseenMsgOnImap, + msg_id as libc::c_int, + Params::new(), + 0, + ); } } free(old_server_folder as *mut libc::c_void); diff --git a/src/dc_configure.rs b/src/dc_configure.rs index 2a94b48689..ef78600920 100644 --- a/src/dc_configure.rs +++ b/src/dc_configure.rs @@ -5,10 +5,10 @@ use quick_xml::events::{BytesEnd, BytesStart, BytesText}; use crate::constants::Event; use crate::context::Context; use crate::dc_e2ee::*; -use crate::dc_job::*; use crate::dc_loginparam::*; use crate::dc_tools::*; use crate::imap::*; +use crate::job::*; use crate::oauth2::*; use crate::param::Params; use crate::types::*; @@ -77,8 +77,8 @@ pub unsafe fn dc_configure(context: &Context) { ); return; } - dc_job_kill_action(context, 900); - dc_job_add(context, 900, 0, Params::new(), 0); + job_kill_action(context, Action::ConfigureImap); + job_add(context, Action::ConfigureImap, 0, Params::new(), 0); } unsafe fn dc_has_ongoing(context: &Context) -> libc::c_int { @@ -118,7 +118,7 @@ pub fn dc_stop_ongoing_process(context: &Context) { // the other dc_job_do_DC_JOB_*() functions are declared static in the c-file #[allow(non_snake_case, unused_must_use)] -pub unsafe fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, _job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context, _job: &Job) { let flags: libc::c_int; let mut success = false; let mut imap_connected_here = false; diff --git a/src/dc_imex.rs b/src/dc_imex.rs index f15cf2ec9d..55858e759c 100644 --- a/src/dc_imex.rs +++ b/src/dc_imex.rs @@ -12,10 +12,10 @@ use crate::constants::*; use crate::context::Context; use crate::dc_configure::*; use crate::dc_e2ee::*; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; use crate::error::*; +use crate::job::*; use crate::key::*; use crate::param::*; use crate::pgp::*; @@ -44,8 +44,8 @@ pub unsafe fn dc_imex( param.set(Param::Arg2, as_str(param2)); } - dc_job_kill_action(context, 910); - dc_job_add(context, 910, 0, param, 0); + job_kill_action(context, Action::ImexImap); + job_add(context, Action::ImexImap, 0, param, 0); } /// Returns the filename of the backup if found, nullptr otherwise. @@ -506,7 +506,7 @@ pub unsafe fn dc_normalize_setup_code( } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: &Job) { let mut ok_to_continue = true; let mut success: libc::c_int = 0; let mut ongoing_allocated_here: libc::c_int = 0; @@ -514,10 +514,10 @@ pub unsafe fn dc_job_do_DC_JOB_IMEX_IMAP(context: &Context, job: *mut dc_job_t) if !(0 == dc_alloc_ongoing(context)) { ongoing_allocated_here = 1; - what = (*job).param.get_int(Param::Cmd).unwrap_or_default(); - let param1_s = (*job).param.get(Param::Arg).unwrap_or_default(); + what = job.param.get_int(Param::Cmd).unwrap_or_default(); + let param1_s = job.param.get(Param::Arg).unwrap_or_default(); let param1 = CString::yolo(param1_s); - let _param2 = CString::yolo((*job).param.get(Param::Arg2).unwrap_or_default()); + let _param2 = CString::yolo(job.param.get(Param::Arg2).unwrap_or_default()); if strlen(param1.as_ptr()) == 0 { error!(context, 0, "No Import/export dir/file given.",); diff --git a/src/dc_job.rs b/src/dc_job.rs deleted file mode 100644 index b26dc51ded..0000000000 --- a/src/dc_job.rs +++ /dev/null @@ -1,1131 +0,0 @@ -use mmime::mmapstring::*; - -use std::ffi::CStr; -use std::ptr; -use std::time::Duration; - -use rand::{thread_rng, Rng}; - -use crate::chat; -use crate::constants::*; -use crate::context::Context; -use crate::dc_configure::*; -use crate::dc_imex::*; -use crate::dc_jobthread::*; -use crate::dc_location::*; -use crate::dc_loginparam::*; -use crate::dc_mimefactory::*; -use crate::dc_msg::*; -use crate::dc_tools::*; -use crate::imap::*; -use crate::param::*; -use crate::sql; -use crate::types::*; -use crate::x::*; - -const DC_IMAP_THREAD: libc::c_int = 100; -const DC_SMTP_THREAD: libc::c_int = 5000; - -// thread IDs -// jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 -// low priority ... -// ... high priority -// jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 -// low priority ... -// ... high priority -// timeouts until actions are aborted. -// this may also affects IDLE to return, so a re-connect may take this time. -// mailcore2 uses 30 seconds, k-9 uses 10 seconds -#[derive(Clone)] -#[repr(C)] -pub struct dc_job_t { - pub job_id: uint32_t, - pub action: libc::c_int, - pub foreign_id: uint32_t, - pub desired_timestamp: i64, - pub added_timestamp: i64, - pub tries: libc::c_int, - pub param: Params, - pub try_again: libc::c_int, - pub pending_error: *mut libc::c_char, -} - -pub unsafe fn dc_perform_imap_jobs(context: &Context) { - info!(context, 0, "dc_perform_imap_jobs starting.",); - - let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); - *context.probe_imap_network.write().unwrap() = false; - *context.perform_inbox_jobs_needed.write().unwrap() = false; - - dc_job_perform(context, DC_IMAP_THREAD, probe_imap_network); - info!(context, 0, "dc_perform_imap_jobs ended.",); -} - -unsafe fn dc_job_perform(context: &Context, thread: libc::c_int, probe_network: bool) { - let query = if !probe_network { - // processing for first-try and after backoff-timeouts: - // process jobs in the order they were added. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" - } else { - // processing after call to dc_maybe_network(): - // process _all_ pending jobs that failed before - // in the order of their backoff-times. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" - }; - - let params_no_probe = params![thread as i64, time()]; - let params_probe = params![thread as i64]; - let params: &[&dyn rusqlite::ToSql] = if !probe_network { - params_no_probe - } else { - params_probe - }; - - let jobs: Result, _> = context.sql.query_map( - query, - params, - |row| { - let job = dc_job_t { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - try_again: 0, - pending_error: 0 as *mut libc::c_char, - }; - - Ok(job) - }, - |jobs| { - jobs.collect::, _>>() - .map_err(Into::into) - }, - ); - match jobs { - Ok(ref _res) => {} - Err(ref err) => { - info!(context, 0, "query failed: {:?}", err); - } - } - for mut job in jobs.unwrap_or_default() { - info!( - context, - 0, - "{}-job #{}, action {} started...", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id, - job.action, - ); - - // some configuration jobs are "exclusive": - // - they are always executed in the imap-thread and the smtp-thread is suspended during execution - // - they may change the database handle change the database handle; we do not keep old pointers therefore - // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution - if 900 == job.action || 910 == job.action { - dc_job_kill_action(context, job.action); - dc_jobthread_suspend(context, &context.sentbox_thread.clone().read().unwrap(), 1); - dc_jobthread_suspend(context, &context.mvbox_thread.clone().read().unwrap(), 1); - dc_suspend_smtp_thread(context, true); - } - - let mut tries = 0; - while tries <= 1 { - // this can be modified by a job using dc_job_try_again_later() - job.try_again = 0; - - match job.action { - 5901 => dc_job_do_DC_JOB_SEND(context, &mut job), - 110 => dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context, &mut job), - 130 => dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, &mut job), - 120 => dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, &mut job), - 200 => dc_job_do_DC_JOB_MOVE_MSG(context, &mut job), - 5011 => dc_job_do_DC_JOB_SEND(context, &mut job), - 900 => dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &mut job), - 910 => dc_job_do_DC_JOB_IMEX_IMAP(context, &mut job), - 5005 => dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &mut job), - 5007 => dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job), - 105 => sql::housekeeping(context), - _ => {} - } - if job.try_again != -1 { - break; - } - tries += 1 - } - if 900 == job.action || 910 == job.action { - dc_jobthread_suspend( - context, - &mut context.sentbox_thread.clone().read().unwrap(), - 0, - ); - dc_jobthread_suspend( - context, - &mut context.mvbox_thread.clone().read().unwrap(), - 0, - ); - dc_suspend_smtp_thread(context, false); - break; - } else if job.try_again == 2 { - // just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready - info!( - context, - 0, - "{}-job #{} not yet ready and will be delayed.", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id - ); - } else if job.try_again == -1 || job.try_again == 3 { - let tries = job.tries + 1; - if tries < 17 { - job.tries = tries; - let time_offset = get_backoff_time_offset(tries); - job.desired_timestamp = job.added_timestamp + time_offset; - dc_job_update(context, &mut job); - info!( - context, - 0, - "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", - if thread == DC_IMAP_THREAD { - "INBOX" - } else { - "SMTP" - }, - job.job_id as libc::c_int, - tries, - time_offset, - job.added_timestamp + time_offset - time() - ); - if thread == DC_SMTP_THREAD && tries < 17 - 1 { - context - .smtp_state - .clone() - .0 - .lock() - .unwrap() - .perform_jobs_needed = 2; - } - } else { - if job.action == 5901 { - dc_set_msg_failed(context, job.foreign_id, job.pending_error); - } - dc_job_delete(context, &mut job); - } - if !probe_network { - continue; - } - // on dc_maybe_network() we stop trying here; - // these jobs are already tried once. - // otherwise, we just continue with the next job - // to give other jobs a chance being tried at least once. - break; - } else { - dc_job_delete(context, &mut job); - } - free(job.pending_error as *mut libc::c_void); - } -} - -fn dc_job_delete(context: &Context, job: &dc_job_t) -> bool { - context - .sql - .execute("DELETE FROM jobs WHERE id=?;", params![job.job_id as i32]) - .is_ok() -} - -/* ****************************************************************************** - * Tools - ******************************************************************************/ -#[allow(non_snake_case)] -fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 { - // results in ~3 weeks for the last backoff timespan - let mut N = 2_i32.pow((c_tries - 1) as u32); - N = N * 60; - let mut rng = thread_rng(); - let n: i32 = rng.gen(); - let mut seconds = n % (N + 1); - if seconds < 1 { - seconds = 1; - } - seconds as i64 -} - -fn dc_job_update(context: &Context, job: &dc_job_t) -> bool { - sql::execute( - context, - &context.sql, - "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", - params![ - job.desired_timestamp, - job.tries as i64, - job.param.to_string(), - job.job_id as i32, - ], - ) - .is_ok() -} - -unsafe fn dc_suspend_smtp_thread(context: &Context, suspend: bool) { - context.smtp_state.0.lock().unwrap().suspended = suspend; - if suspend { - loop { - if !context.smtp_state.0.lock().unwrap().doing_jobs { - return; - } - std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); - } - } -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_SEND(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let mut filename: *mut libc::c_char = 0 as *mut libc::c_char; - let mut buf: *mut libc::c_void = ptr::null_mut(); - let mut buf_bytes: size_t = 0i32 as size_t; - - /* connect to SMTP server, if not yet done */ - if !context.smtp.lock().unwrap().is_connected() { - let loginparam = dc_loginparam_read(context, &context.sql, "configured_"); - let connected = context.smtp.lock().unwrap().connect(context, &loginparam); - - if !connected { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - let filename_s = job.param.get(Param::File).unwrap_or_default(); - filename = filename_s.strdup(); - if strlen(filename) == 0 { - warn!(context, 0, "Missing file name for job {}", job.job_id,); - } else if !(0 == dc_read_file(context, filename, &mut buf, &mut buf_bytes)) { - let recipients = job.param.get(Param::Recipients); - if recipients.is_none() { - warn!(context, 0, "Missing recipients for job {}", job.job_id,); - } else { - let recipients_list = recipients - .unwrap() - .split("\x1e") - .filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) { - Ok(addr) => Some(addr), - Err(err) => { - eprintln!("WARNING: invalid recipient: {} {:?}", addr, err); - None - } - }) - .collect::>(); - /* if there is a msg-id and it does not exist in the db, cancel sending. - this happends if dc_delete_msgs() was called - before the generated mime was sent out */ - let ok_to_continue1; - if 0 != job.foreign_id { - if 0 == dc_msg_exists(context, job.foreign_id) { - warn!( - context, - 0, "Message {} for job {} does not exist", job.foreign_id, job.job_id, - ); - ok_to_continue1 = false; - } else { - ok_to_continue1 = true; - } - } else { - ok_to_continue1 = true; - } - if ok_to_continue1 { - /* send message */ - let body = std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec(); - - // hold the smtp lock during sending of a job and - // its ok/error response processing. Note that if a message - // was sent we need to mark it in the database as we - // otherwise might send it twice. - let mut sock = context.smtp.lock().unwrap(); - if 0 == sock.send(context, recipients_list, body) { - sock.disconnect(); - dc_job_try_again_later(job, -1i32, sock.error); - } else { - dc_delete_file(context, filename_s); - if 0 != job.foreign_id { - dc_update_msg_state( - context, - job.foreign_id, - MessageState::OutDelivered, - ); - let chat_id: i32 = context - .sql - .query_row_col( - context, - "SELECT chat_id FROM msgs WHERE id=?", - params![job.foreign_id as i32], - 0, - ) - .unwrap_or_default(); - context.call_cb( - Event::MSG_DELIVERED, - chat_id as uintptr_t, - job.foreign_id as uintptr_t, - ); - } - } - } - } - } - } - free(buf); - free(filename as *mut libc::c_void); -} - -// this value does not increase the number of tries -unsafe fn dc_job_try_again_later( - job: &mut dc_job_t, - try_again: libc::c_int, - pending_error: *const libc::c_char, -) { - job.try_again = try_again; - free(job.pending_error as *mut libc::c_void); - job.pending_error = dc_strdup_keep_null(pending_error); -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MOVE_MSG(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let msg = dc_msg_new_untyped(context); - let mut dest_uid: uint32_t = 0i32 as uint32_t; - - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if dc_msg_load_from_db(msg, context, job.foreign_id) { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - inbox.configure_folders(context, 0x1i32); - } - let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); - - if let Some(dest_folder) = dest_folder { - let server_folder = (*msg).server_folder.as_ref().unwrap(); - - match inbox.mv( - context, - server_folder, - (*msg).server_uid, - &dest_folder, - &mut dest_uid, - ) as libc::c_uint - { - 1 => { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - 3 => { - dc_update_server_uid(context, (*msg).rfc724_mid, &dest_folder, dest_uid); - } - 0 | 2 | _ => {} - } - } - } - } - - dc_msg_unref(msg); -} - -/* ****************************************************************************** - * IMAP-jobs - ******************************************************************************/ -fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { - let ret_connected = dc_connect_to_configured_imap(context, inbox); - if 0 != ret_connected { - inbox.set_watch_folder("INBOX".into()); - } - ret_connected -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let folder = job - .param - .get(Param::ServerFolder) - .unwrap_or_default() - .to_string(); - let uid = job.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let mut dest_uid = 0; - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if inbox.set_seen(context, &folder, uid) == 0 { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - if 0 != job.param.get_int(Param::AlsoMove).unwrap_or_default() { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - inbox.configure_folders(context, 0x1i32); - } - let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); - if let Some(dest_folder) = dest_folder { - if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid) as libc::c_uint { - dc_job_try_again_later(job, 3, 0 as *const libc::c_char); - } - } - } - } -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let ok_to_continue; - let msg: *mut dc_msg_t = dc_msg_new_untyped(context); - let inbox = context.inbox.read().unwrap(); - - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, ptr::null()); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - if dc_msg_load_from_db(msg, context, job.foreign_id) { - let server_folder = (*msg).server_folder.as_ref().unwrap(); - match inbox.set_seen(context, server_folder, (*msg).server_uid) as libc::c_uint { - 0 => {} - 1 => { - dc_job_try_again_later(job, 3i32, ptr::null()); - } - _ => { - if 0 != (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() - && 0 != context - .sql - .get_config_int(context, "mdns_enabled") - .unwrap_or_else(|| 1) - { - let folder = (*msg).server_folder.as_ref().unwrap(); - - match inbox.set_mdnsent(context, folder, (*msg).server_uid) as libc::c_uint - { - 1 => { - dc_job_try_again_later(job, 3i32, 0 as *const libc::c_char); - } - 3 => { - dc_send_mdn(context, (*msg).id); - } - 0 | 2 | _ => {} - } - } - } - } - } - } - dc_msg_unref(msg); -} -unsafe fn dc_send_mdn(context: &Context, msg_id: uint32_t) { - let mut mimefactory = dc_mimefactory_t { - from_addr: ptr::null_mut(), - from_displayname: ptr::null_mut(), - selfstatus: ptr::null_mut(), - recipients_names: ptr::null_mut(), - recipients_addr: ptr::null_mut(), - timestamp: 0, - rfc724_mid: ptr::null_mut(), - loaded: DC_MF_NOTHING_LOADED, - msg: ptr::null_mut(), - chat: None, - increation: 0, - in_reply_to: ptr::null_mut(), - references: ptr::null_mut(), - req_mdn: 0, - out: ptr::null_mut(), - out_encrypted: 0, - out_gossiped: 0, - out_last_added_location_id: 0, - error: ptr::null_mut(), - context, - }; - - if !(0 == dc_mimefactory_load_mdn(&mut mimefactory, msg_id) - || 0 == dc_mimefactory_render(&mut mimefactory)) - { - dc_add_smtp_job(context, 5011i32, &mut mimefactory); - } - dc_mimefactory_empty(&mut mimefactory); -} -/* ****************************************************************************** - * SMTP-jobs - ******************************************************************************/ -/* * - * Store the MIME message in a file and send it later with a new SMTP job. - * - * @param context The context object as created by dc_context_new() - * @param action One of the DC_JOB_SEND_ constants - * @param mimefactory An instance of dc_mimefactory_t with a loaded and rendered message or MDN - * @return 1=success, 0=error - */ -#[allow(non_snake_case)] -unsafe fn dc_add_smtp_job( - context: &Context, - action: libc::c_int, - mimefactory: *mut dc_mimefactory_t, -) -> libc::c_int { - let pathNfilename: *mut libc::c_char; - let mut success: libc::c_int = 0i32; - let mut recipients: *mut libc::c_char = 0 as *mut libc::c_char; - let mut param = Params::new(); - pathNfilename = dc_get_fine_pathNfilename( - context, - b"$BLOBDIR\x00" as *const u8 as *const libc::c_char, - (*mimefactory).rfc724_mid, - ); - if pathNfilename.is_null() { - error!( - context, - 0, - "Could not find free file name for message with ID <{}>.", - to_string((*mimefactory).rfc724_mid), - ); - } else if 0 - == dc_write_file( - context, - pathNfilename, - (*(*mimefactory).out).str_0 as *const libc::c_void, - (*(*mimefactory).out).len, - ) - { - error!( - context, - 0, - "Could not write message <{}> to \"{}\".", - to_string((*mimefactory).rfc724_mid), - as_str(pathNfilename), - ); - } else { - recipients = dc_str_from_clist( - (*mimefactory).recipients_addr, - b"\x1e\x00" as *const u8 as *const libc::c_char, - ); - param.set(Param::File, as_str(pathNfilename)); - param.set(Param::Recipients, as_str(recipients)); - dc_job_add( - context, - action, - (if (*mimefactory).loaded as libc::c_uint - == DC_MF_MSG_LOADED as libc::c_int as libc::c_uint - { - (*(*mimefactory).msg).id - } else { - 0 - }) as libc::c_int, - param, - 0, - ); - success = 1i32 - } - free(recipients as *mut libc::c_void); - free(pathNfilename as *mut libc::c_void); - success -} - -pub unsafe fn dc_job_add( - context: &Context, - action: libc::c_int, - foreign_id: libc::c_int, - param: Params, - delay_seconds: libc::c_int, -) { - let timestamp = time(); - let thread = if action >= DC_IMAP_THREAD && action < DC_IMAP_THREAD + 1000 { - DC_IMAP_THREAD - } else if action >= DC_SMTP_THREAD && action < DC_SMTP_THREAD + 1000 { - DC_SMTP_THREAD - } else { - return; - }; - - sql::execute( - context, - &context.sql, - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", - params![ - timestamp, - thread, - action, - foreign_id, - param.to_string(), - (timestamp + delay_seconds as i64) - ] - ).ok(); - - if thread == DC_IMAP_THREAD { - dc_interrupt_imap_idle(context); - } else { - dc_interrupt_smtp_idle(context); - } -} - -pub unsafe fn dc_interrupt_smtp_idle(context: &Context) { - info!(context, 0, "Interrupting SMTP-idle...",); - - let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - state.perform_jobs_needed = 1; - state.idle = true; - cvar.notify_one(); -} - -pub unsafe fn dc_interrupt_imap_idle(context: &Context) { - info!(context, 0, "Interrupting IMAP-IDLE...",); - - *context.perform_inbox_jobs_needed.write().unwrap() = true; - context.inbox.read().unwrap().interrupt_idle(); -} - -#[allow(non_snake_case)] -unsafe fn dc_job_do_DC_JOB_DELETE_MSG_ON_IMAP(context: &Context, job: &mut dc_job_t) { - let mut delete_from_server: libc::c_int = 1i32; - let msg: *mut dc_msg_t = dc_msg_new_untyped(context); - let inbox = context.inbox.read().unwrap(); - - if !(!dc_msg_load_from_db(msg, context, job.foreign_id) - || (*msg).rfc724_mid.is_null() - || *(*msg).rfc724_mid.offset(0isize) as libc::c_int == 0i32) - { - let ok_to_continue1; - /* eg. device messages have no Message-ID */ - if dc_rfc724_mid_cnt(context, (*msg).rfc724_mid) != 1i32 { - info!( - context, - 0, "The message is deleted from the server when all parts are deleted.", - ); - delete_from_server = 0i32 - } - /* if this is the last existing part of the message, we delete the message from the server */ - if 0 != delete_from_server { - let ok_to_continue; - if !inbox.is_connected() { - connect_to_inbox(context, &inbox); - if !inbox.is_connected() { - dc_job_try_again_later(job, 3i32, 0 as *const libc::c_char); - ok_to_continue = false; - } else { - ok_to_continue = true; - } - } else { - ok_to_continue = true; - } - if ok_to_continue { - let mid = CStr::from_ptr((*msg).rfc724_mid).to_str().unwrap(); - let server_folder = (*msg).server_folder.as_ref().unwrap(); - if 0 == inbox.delete_msg(context, mid, server_folder, &mut (*msg).server_uid) { - dc_job_try_again_later(job, -1i32, 0 as *const libc::c_char); - ok_to_continue1 = false; - } else { - ok_to_continue1 = true; - } - } else { - ok_to_continue1 = false; - } - } else { - ok_to_continue1 = true; - } - if ok_to_continue1 { - dc_delete_msg_from_db(context, (*msg).id); - } - } - dc_msg_unref(msg); -} - -/* delete all pending jobs with the given action */ -pub fn dc_job_kill_action(context: &Context, action: libc::c_int) -> bool { - sql::execute( - context, - &context.sql, - "DELETE FROM jobs WHERE action=?;", - params![action], - ) - .is_ok() -} - -pub unsafe fn dc_perform_imap_fetch(context: &Context) { - let inbox = context.inbox.read().unwrap(); - let start = clock(); - - if 0 == connect_to_inbox(context, &inbox) { - return; - } - if context - .sql - .get_config_int(context, "inbox_watch") - .unwrap_or_else(|| 1) - == 0 - { - info!(context, 0, "INBOX-watch disabled.",); - return; - } - info!(context, 0, "INBOX-fetch started...",); - inbox.fetch(context); - if inbox.should_reconnect() { - info!(context, 0, "INBOX-fetch aborted, starting over...",); - inbox.fetch(context); - } - info!( - context, - 0, - "INBOX-fetch done in {:.4} ms.", - clock().wrapping_sub(start) as libc::c_double * 1000.0f64 / 1000000 as libc::c_double, - ); -} - -pub fn dc_perform_imap_idle(context: &Context) { - let inbox = context.inbox.read().unwrap(); - - connect_to_inbox(context, &inbox); - - if *context.perform_inbox_jobs_needed.clone().read().unwrap() { - info!( - context, - 0, "INBOX-IDLE will not be started because of waiting jobs." - ); - return; - } - info!(context, 0, "INBOX-IDLE started..."); - inbox.idle(context); - info!(context, 0, "INBOX-IDLE ended."); -} - -pub unsafe fn dc_perform_mvbox_fetch(context: &Context) { - let use_network = context - .sql - .get_config_int(context, "mvbox_watch") - .unwrap_or_else(|| 1); - dc_jobthread_fetch( - context, - &mut context.mvbox_thread.clone().write().unwrap(), - use_network, - ); -} - -pub unsafe fn dc_perform_mvbox_idle(context: &Context) { - let use_network = context - .sql - .get_config_int(context, "mvbox_watch") - .unwrap_or_else(|| 1); - - dc_jobthread_idle( - context, - &context.mvbox_thread.clone().read().unwrap(), - use_network, - ); -} - -pub unsafe fn dc_interrupt_mvbox_idle(context: &Context) { - dc_jobthread_interrupt_idle(context, &context.mvbox_thread.clone().read().unwrap()); -} - -pub unsafe fn dc_perform_sentbox_fetch(context: &Context) { - let use_network = context - .sql - .get_config_int(context, "sentbox_watch") - .unwrap_or_else(|| 1); - dc_jobthread_fetch( - context, - &mut context.sentbox_thread.clone().write().unwrap(), - use_network, - ); -} - -pub unsafe fn dc_perform_sentbox_idle(context: &Context) { - let use_network = context - .sql - .get_config_int(context, "sentbox_watch") - .unwrap_or_else(|| 1); - dc_jobthread_idle( - context, - &context.sentbox_thread.clone().read().unwrap(), - use_network, - ); -} - -pub unsafe fn dc_interrupt_sentbox_idle(context: &Context) { - dc_jobthread_interrupt_idle(context, &context.sentbox_thread.clone().read().unwrap()); -} - -pub unsafe fn dc_perform_smtp_jobs(context: &Context) { - let probe_smtp_network = { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - let probe_smtp_network = state.probe_network; - state.probe_network = false; - state.perform_jobs_needed = 0; - - if state.suspended { - info!(context, 0, "SMTP-jobs suspended.",); - return; - } - state.doing_jobs = true; - probe_smtp_network - }; - - info!(context, 0, "SMTP-jobs started...",); - dc_job_perform(context, DC_SMTP_THREAD, probe_smtp_network); - info!(context, 0, "SMTP-jobs ended."); - - { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - state.doing_jobs = false; - } -} - -pub unsafe fn dc_perform_smtp_idle(context: &Context) { - info!(context, 0, "SMTP-idle started...",); - { - let &(ref lock, ref cvar) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - - if state.perform_jobs_needed == 1 { - info!( - context, - 0, "SMTP-idle will not be started because of waiting jobs.", - ); - } else { - let dur = get_next_wakeup_time(context, DC_SMTP_THREAD); - - loop { - let res = cvar.wait_timeout(state, dur).unwrap(); - state = res.0; - - if state.idle == true || res.1.timed_out() { - // We received the notification and the value has been updated, we can leave. - break; - } - } - state.idle = false; - } - } - - info!(context, 0, "SMTP-idle ended.",); -} - -unsafe fn get_next_wakeup_time(context: &Context, thread: libc::c_int) -> Duration { - let t: i64 = context - .sql - .query_row_col( - context, - "SELECT MIN(desired_timestamp) FROM jobs WHERE thread=?;", - params![thread], - 0, - ) - .unwrap_or_default(); - - let mut wakeup_time = Duration::new(10 * 60, 0); - let now = time(); - if t > 0 { - if t > now { - wakeup_time = Duration::new((t - now) as u64, 0); - } else { - wakeup_time = Duration::new(0, 0); - } - } - - wakeup_time -} - -pub unsafe fn dc_maybe_network(context: &Context) { - { - let &(ref lock, _) = &*context.smtp_state.clone(); - let mut state = lock.lock().unwrap(); - state.probe_network = true; - - *context.probe_imap_network.write().unwrap() = true; - } - - dc_interrupt_smtp_idle(context); - dc_interrupt_imap_idle(context); - dc_interrupt_mvbox_idle(context); - dc_interrupt_sentbox_idle(context); -} - -pub fn dc_job_action_exists(context: &Context, action: libc::c_int) -> bool { - context - .sql - .exists("SELECT id FROM jobs WHERE action=?;", params![action]) - .unwrap_or_default() -} - -/* special case for DC_JOB_SEND_MSG_TO_SMTP */ -#[allow(non_snake_case)] -pub unsafe fn dc_job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_int { - let mut success = 0; - let mut mimefactory = dc_mimefactory_t { - from_addr: 0 as *mut libc::c_char, - from_displayname: 0 as *mut libc::c_char, - selfstatus: 0 as *mut libc::c_char, - recipients_names: 0 as *mut clist, - recipients_addr: 0 as *mut clist, - timestamp: 0, - rfc724_mid: 0 as *mut libc::c_char, - loaded: DC_MF_NOTHING_LOADED, - msg: 0 as *mut dc_msg_t, - chat: None, - increation: 0, - in_reply_to: 0 as *mut libc::c_char, - references: 0 as *mut libc::c_char, - req_mdn: 0, - out: 0 as *mut MMAPString, - out_encrypted: 0, - out_gossiped: 0, - out_last_added_location_id: 0, - error: 0 as *mut libc::c_char, - context, - }; - - /* load message data */ - if 0 == dc_mimefactory_load_msg(&mut mimefactory, msg_id) || mimefactory.from_addr.is_null() { - warn!( - context, - 0, "Cannot load data to send, maybe the message is deleted in between.", - ); - } else { - // no redo, no IMAP. moreover, as the data does not exist, there is no need in calling dc_set_msg_failed() - if chat::msgtype_has_file((*mimefactory.msg).type_0) { - if let Some(pathNfilename) = (*mimefactory.msg).param.get(Param::File) { - if ((*mimefactory.msg).type_0 == Viewtype::Image - || (*mimefactory.msg).type_0 == Viewtype::Gif) - && !(*mimefactory.msg).param.exists(Param::Width) - { - (*mimefactory.msg).param.set_int(Param::Width, 0); - (*mimefactory.msg).param.set_int(Param::Height, 0); - - if let Some(buf) = dc_read_file_safe(context, pathNfilename) { - if let Ok((width, height)) = dc_get_filemeta(&buf) { - (*mimefactory.msg).param.set_int(Param::Width, width as i32); - (*mimefactory.msg) - .param - .set_int(Param::Height, height as i32); - } - } - dc_msg_save_param_to_disk(mimefactory.msg); - } - } - } - /* create message */ - if 0 == dc_mimefactory_render(&mut mimefactory) { - dc_set_msg_failed(context, msg_id, mimefactory.error); - } else if 0 - != (*mimefactory.msg) - .param - .get_int(Param::GuranteeE2ee) - .unwrap_or_default() - && 0 == mimefactory.out_encrypted - { - warn!( - context, - 0, - "e2e encryption unavailable {} - {:?}", - msg_id, - (*mimefactory.msg).param.get_int(Param::GuranteeE2ee), - ); - dc_set_msg_failed( - context, - msg_id, - b"End-to-end-encryption unavailable unexpectedly.\x00" as *const u8 - as *const libc::c_char, - ); - } else { - /* unrecoverable */ - if clist_search_string_nocase(mimefactory.recipients_addr, mimefactory.from_addr) - == 0i32 - { - clist_insert_after( - mimefactory.recipients_names, - (*mimefactory.recipients_names).last, - 0 as *mut libc::c_void, - ); - clist_insert_after( - mimefactory.recipients_addr, - (*mimefactory.recipients_addr).last, - dc_strdup(mimefactory.from_addr) as *mut libc::c_void, - ); - } - if 0 != mimefactory.out_gossiped { - chat::set_gossiped_timestamp(context, (*mimefactory.msg).chat_id, time()); - } - if 0 != mimefactory.out_last_added_location_id { - dc_set_kml_sent_timestamp(context, (*mimefactory.msg).chat_id, time()); - if 0 == (*mimefactory.msg).hidden { - dc_set_msg_location_id( - context, - (*mimefactory.msg).id, - mimefactory.out_last_added_location_id, - ); - } - } - if 0 != mimefactory.out_encrypted - && (*mimefactory.msg) - .param - .get_int(Param::GuranteeE2ee) - .unwrap_or_default() - == 0 - { - (*mimefactory.msg).param.set_int(Param::GuranteeE2ee, 1); - dc_msg_save_param_to_disk(mimefactory.msg); - } - success = dc_add_smtp_job(context, 5901i32, &mut mimefactory); - } - } - dc_mimefactory_empty(&mut mimefactory); - - success -} diff --git a/src/dc_jobthread.rs b/src/dc_jobthread.rs deleted file mode 100644 index c37394f4bb..0000000000 --- a/src/dc_jobthread.rs +++ /dev/null @@ -1,209 +0,0 @@ -use std::sync::{Arc, Condvar, Mutex}; - -use crate::context::Context; -use crate::dc_configure::*; -use crate::imap::Imap; -use crate::x::*; - -#[repr(C)] -pub struct dc_jobthread_t { - pub name: &'static str, - pub folder_config_name: &'static str, - pub imap: Imap, - pub state: Arc<(Mutex, Condvar)>, -} - -pub fn dc_jobthread_init( - name: &'static str, - folder_config_name: &'static str, - imap: Imap, -) -> dc_jobthread_t { - dc_jobthread_t { - name, - folder_config_name, - imap, - state: Arc::new((Mutex::new(Default::default()), Condvar::new())), - } -} - -#[derive(Debug, Default)] -pub struct JobState { - idle: bool, - jobs_needed: i32, - suspended: i32, - using_handle: i32, -} - -pub unsafe fn dc_jobthread_suspend( - context: &Context, - jobthread: &dc_jobthread_t, - suspend: libc::c_int, -) { - if 0 != suspend { - info!(context, 0, "Suspending {}-thread.", jobthread.name,); - { - jobthread.state.0.lock().unwrap().suspended = 1; - } - dc_jobthread_interrupt_idle(context, jobthread); - loop { - let using_handle = jobthread.state.0.lock().unwrap().using_handle; - if using_handle == 0 { - return; - } - std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); - } - } else { - info!(context, 0, "Unsuspending {}-thread.", jobthread.name); - - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - state.suspended = 0; - state.idle = true; - cvar.notify_one(); - } -} - -pub unsafe fn dc_jobthread_interrupt_idle(context: &Context, jobthread: &dc_jobthread_t) { - { - jobthread.state.0.lock().unwrap().jobs_needed = 1; - } - - info!(context, 0, "Interrupting {}-IDLE...", jobthread.name); - - jobthread.imap.interrupt_idle(); - - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - state.idle = true; - cvar.notify_one(); -} - -pub unsafe fn dc_jobthread_fetch( - context: &Context, - jobthread: &mut dc_jobthread_t, - use_network: libc::c_int, -) { - let start; - - { - let &(ref lock, _) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - if 0 != state.suspended { - return; - } - - state.using_handle = 1; - } - - if 0 != use_network { - start = clock(); - if !(0 == connect_to_imap(context, jobthread)) { - info!(context, 0, "{}-fetch started...", jobthread.name); - jobthread.imap.fetch(context); - - if jobthread.imap.should_reconnect() { - info!( - context, - 0, "{}-fetch aborted, starting over...", jobthread.name, - ); - jobthread.imap.fetch(context); - } - info!( - context, - 0, - "{}-fetch done in {:.3} ms.", - jobthread.name, - clock().wrapping_sub(start) as f64 / 1000.0, - ); - } - } - - jobthread.state.0.lock().unwrap().using_handle = 0; -} - -/* ****************************************************************************** - * the typical fetch, idle, interrupt-idle - ******************************************************************************/ - -unsafe fn connect_to_imap(context: &Context, jobthread: &dc_jobthread_t) -> libc::c_int { - if jobthread.imap.is_connected() { - return 1; - } - - let mut ret_connected = dc_connect_to_configured_imap(context, &jobthread.imap); - - if !(0 == ret_connected) { - if context - .sql - .get_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - jobthread.imap.configure_folders(context, 0x1); - } - - if let Some(mvbox_name) = context - .sql - .get_config(context, jobthread.folder_config_name) - { - jobthread.imap.set_watch_folder(mvbox_name); - } else { - jobthread.imap.disconnect(context); - ret_connected = 0; - } - } - - ret_connected -} - -pub unsafe fn dc_jobthread_idle( - context: &Context, - jobthread: &dc_jobthread_t, - use_network: libc::c_int, -) { - { - let &(ref lock, ref cvar) = &*jobthread.state.clone(); - let mut state = lock.lock().unwrap(); - - if 0 != state.jobs_needed { - info!( - context, - 0, - "{}-IDLE will not be started as it was interrupted while not ideling.", - jobthread.name, - ); - state.jobs_needed = 0; - return; - } - - if 0 != state.suspended { - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; - return; - } - - state.using_handle = 1; - - if 0 == use_network { - state.using_handle = 0; - - while !state.idle { - state = cvar.wait(state).unwrap(); - } - state.idle = false; - return; - } - } - - connect_to_imap(context, jobthread); - info!(context, 0, "{}-IDLE started...", jobthread.name,); - jobthread.imap.idle(context); - info!(context, 0, "{}-IDLE ended.", jobthread.name); - - jobthread.state.0.lock().unwrap().using_handle = 0; -} diff --git a/src/dc_location.rs b/src/dc_location.rs index 86a23908ed..eb37447622 100644 --- a/src/dc_location.rs +++ b/src/dc_location.rs @@ -5,9 +5,9 @@ use crate::chat; use crate::constants::Event; use crate::constants::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_msg::*; use crate::dc_tools::*; +use crate::job::*; use crate::param::*; use crate::sql; use crate::stock::StockMessage; @@ -68,15 +68,11 @@ impl dc_kml_t { } // location streaming -pub unsafe fn dc_send_locations_to_chat( - context: &Context, - chat_id: uint32_t, - seconds: libc::c_int, -) { +pub unsafe fn dc_send_locations_to_chat(context: &Context, chat_id: uint32_t, seconds: i64) { let now = time(); let mut msg: *mut dc_msg_t = 0 as *mut dc_msg_t; let is_sending_locations_before: bool; - if !(seconds < 0i32 || chat_id <= 9i32 as libc::c_uint) { + if !(seconds < 0 || chat_id <= 9i32 as libc::c_uint) { is_sending_locations_before = dc_is_sending_locations_to_chat(context, chat_id); if sql::execute( context, @@ -87,11 +83,7 @@ pub unsafe fn dc_send_locations_to_chat( WHERE id=?", params![ if 0 != seconds { now } else { 0 }, - if 0 != seconds { - now + seconds as i64 - } else { - 0 - }, + if 0 != seconds { now + seconds } else { 0 }, chat_id as i32, ], ) @@ -115,12 +107,12 @@ pub unsafe fn dc_send_locations_to_chat( ); if 0 != seconds { schedule_MAYBE_SEND_LOCATIONS(context, 0i32); - dc_job_add( + job_add( context, - 5007i32, + Action::MaybeSendLocationsEnded, chat_id as libc::c_int, Params::new(), - seconds + 1i32, + seconds + 1, ); } } @@ -133,8 +125,8 @@ pub unsafe fn dc_send_locations_to_chat( ******************************************************************************/ #[allow(non_snake_case)] unsafe fn schedule_MAYBE_SEND_LOCATIONS(context: &Context, flags: libc::c_int) { - if 0 != flags & 0x1 || !dc_job_action_exists(context, 5005) { - dc_job_add(context, 5005, 0, Params::new(), 60); + if 0 != flags & 0x1 || !job_action_exists(context, Action::MaybeSendLocations) { + job_add(context, Action::MaybeSendLocations, 0, Params::new(), 60); }; } @@ -625,7 +617,7 @@ pub unsafe fn dc_kml_unref(kml: &mut dc_kml_t) { } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: *mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: &Job) { let now = time(); let mut continue_streaming: libc::c_int = 1; info!( @@ -707,7 +699,7 @@ pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context: &Context, _job: *mu } #[allow(non_snake_case)] -pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context: &Context, job: &mut dc_job_t) { +pub unsafe fn dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context: &Context, job: &mut Job) { // this function is called when location-streaming _might_ have ended for a chat. // the function checks, if location-streaming is really ended; // if so, a device-message is added if not yet done. diff --git a/src/dc_move.rs b/src/dc_move.rs index 9c937db61f..3767db86a1 100644 --- a/src/dc_move.rs +++ b/src/dc_move.rs @@ -1,7 +1,7 @@ use crate::constants::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_msg::*; +use crate::job::*; use crate::param::Params; pub unsafe fn dc_do_heuristics_moves(context: &Context, folder: &str, msg_id: u32) { @@ -32,7 +32,13 @@ pub unsafe fn dc_do_heuristics_moves(context: &Context, folder: &str, msg_id: u3 // 1 = dc message, 2 = reply to dc message if 0 != (*msg).is_dc_message { - dc_job_add(context, 200, (*msg).id as libc::c_int, Params::new(), 0); + job_add( + context, + Action::MoveMsg, + (*msg).id as libc::c_int, + Params::new(), + 0, + ); dc_update_msg_move_state(context, (*msg).rfc724_mid, MoveState::Moving); } diff --git a/src/dc_msg.rs b/src/dc_msg.rs index 4f374f92d1..5bd7394971 100644 --- a/src/dc_msg.rs +++ b/src/dc_msg.rs @@ -9,8 +9,8 @@ use crate::chat::{self, Chat}; use crate::constants::*; use crate::contact::*; use crate::context::*; -use crate::dc_job::*; use crate::dc_tools::*; +use crate::job::*; use crate::lot::{Lot, LotState, Meaning}; use crate::param::*; use crate::pgp::*; @@ -591,9 +591,9 @@ pub unsafe fn dc_delete_msgs(context: &Context, msg_ids: *const uint32_t, msg_cn let mut i: libc::c_int = 0i32; while i < msg_cnt { dc_update_msg_chat_id(context, *msg_ids.offset(i as isize), 3i32 as uint32_t); - dc_job_add( + job_add( context, - 110, + Action::DeleteMsgOnImap, *msg_ids.offset(i as isize) as libc::c_int, Params::new(), 0, @@ -603,8 +603,8 @@ pub unsafe fn dc_delete_msgs(context: &Context, msg_ids: *const uint32_t, msg_cn if 0 != msg_cnt { context.call_cb(Event::MSGS_CHANGED, 0 as uintptr_t, 0 as uintptr_t); - dc_job_kill_action(context, 105); - dc_job_add(context, 105, 0, Params::new(), 10); + job_kill_action(context, Action::Housekeeping); + job_add(context, Action::Housekeeping, 0, Params::new(), 10); }; } @@ -654,7 +654,13 @@ pub fn dc_markseen_msgs(context: &Context, msg_ids: *const u32, msg_cnt: usize) dc_update_msg_state(context, id, MessageState::InSeen); info!(context, 0, "Seen message #{}.", id); - unsafe { dc_job_add(context, 130, id as i32, Params::new(), 0) }; + job_add( + context, + Action::MarkseenMsgOnImap, + id as i32, + Params::new(), + 0, + ); send_event = true; } } else if curr_state == MessageState::InFresh { @@ -1150,16 +1156,17 @@ pub unsafe fn dc_msg_latefiling_mediasize( }; } -pub unsafe fn dc_msg_save_param_to_disk(msg: *mut dc_msg_t) -> bool { +pub fn dc_msg_save_param_to_disk(msg: *mut dc_msg_t) -> bool { if msg.is_null() { return false; } + let msg = unsafe { &*msg }; sql::execute( - (*msg).context, - &(*msg).context.sql, + msg.context, + &msg.context.sql, "UPDATE msgs SET param=? WHERE id=?;", - params![(*msg).param.to_string(), (*msg).id as i32], + params![msg.param.to_string(), msg.id as i32], ) .is_ok() } @@ -1235,16 +1242,16 @@ pub fn dc_update_msg_move_state( .is_ok() } -pub unsafe fn dc_set_msg_failed(context: &Context, msg_id: uint32_t, error: *const libc::c_char) { +pub unsafe fn dc_set_msg_failed(context: &Context, msg_id: u32, error: Option>) { let mut msg = dc_msg_new_untyped(context); if dc_msg_load_from_db(msg, context, msg_id) { if (*msg).state.can_fail() { (*msg).state = MessageState::OutFailed; } - if !error.is_null() { - (*msg).param.set(Param::Error, as_str(error)); - error!(context, 0, "{}", as_str(error),); + if let Some(error) = error { + (*msg).param.set(Param::Error, error.as_ref()); + error!(context, 0, "{}", error.as_ref()); } if sql::execute( diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index 95dc879bb2..c810530010 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -14,7 +14,6 @@ use crate::chat::{self, Chat}; use crate::constants::*; use crate::contact::*; use crate::context::Context; -use crate::dc_job::*; use crate::dc_location::*; use crate::dc_mimeparser::*; use crate::dc_move::*; @@ -23,6 +22,7 @@ use crate::dc_securejoin::*; use crate::dc_strencode::*; use crate::dc_tools::*; use crate::error::Result; +use crate::job::*; use crate::param::*; use crate::peerstate::*; use crate::sql; @@ -232,9 +232,9 @@ pub unsafe fn dc_receive_imf( } if 0 != add_delete_job && !created_db_entries.is_empty() { - dc_job_add( + job_add( context, - DC_JOB_DELETE_MSG_ON_IMAP, + Action::DeleteMsgOnImap, created_db_entries[0].1 as i32, Params::new(), 0, @@ -920,7 +920,7 @@ unsafe fn handle_reports( { param.set_int(Param::AlsoMove, 1); } - dc_job_add(context, 120, 0, param, 0); + job_add(context, Action::MarkseenMdnOnImap, 0, param, 0); } } } diff --git a/src/dc_tools.rs b/src/dc_tools.rs index 5c2d680cbf..b500fcc557 100644 --- a/src/dc_tools.rs +++ b/src/dc_tools.rs @@ -1284,6 +1284,14 @@ pub fn as_str<'a>(s: *const libc::c_char) -> &'a str { as_str_safe(s).unwrap_or_else(|err| panic!("{}", err)) } +/// Converts a C string to either a Rust `&str` or `None` if it is a null pointer. +pub fn as_opt_str<'a>(s: *const libc::c_char) -> Option<&'a str> { + if s.is_null() { + return None; + } + Some(as_str(s)) +} + fn as_str_safe<'a>(s: *const libc::c_char) -> Result<&'a str, Error> { assert!(!s.is_null(), "cannot be used on null pointers"); diff --git a/src/imap.rs b/src/imap.rs index a477205c81..a9d8147871 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -25,7 +25,6 @@ const PREFETCH_FLAGS: &str = "(UID ENVELOPE)"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const FETCH_FLAGS: &str = "(FLAGS)"; -#[repr(C)] pub struct Imap { config: Arc>, watch: Arc<(Mutex, Condvar)>, diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000000..ea7faa5a78 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,1181 @@ +use std::ffi::CStr; +use std::ptr; +use std::time::Duration; + +use deltachat_derive::{FromSql, ToSql}; +use rand::{thread_rng, Rng}; + +use crate::chat; +use crate::constants::*; +use crate::context::Context; +use crate::dc_configure::*; +use crate::dc_imex::*; +use crate::dc_location::*; +use crate::dc_loginparam::*; +use crate::dc_mimefactory::*; +use crate::dc_msg::*; +use crate::dc_tools::*; +use crate::imap::*; +use crate::param::*; +use crate::sql; +use crate::types::*; +use crate::x::*; + +/// Thread IDs +#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[repr(i32)] +enum Thread { + Imap = 100, + Smtp = 5000, +} + +#[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql)] +#[repr(i32)] +pub enum Action { + // Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 + Housekeeping = 105, // low priority ... + DeleteMsgOnImap = 110, + MarkseenMdnOnImap = 120, + MarkseenMsgOnImap = 130, + MoveMsg = 200, + ConfigureImap = 900, + ImexImap = 910, // ... high priority + + // Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 + MaybeSendLocations = 5005, // low priority ... + MaybeSendLocationsEnded = 5007, + SendMdnOld = 5010, + SendMdn = 5011, + SendMsgToSmtpOld = 5900, + SendMsgToSmtp = 5901, // ... high priority +} + +impl From for Thread { + fn from(action: Action) -> Thread { + use Action::*; + + match action { + Housekeeping => Thread::Imap, + DeleteMsgOnImap => Thread::Imap, + MarkseenMdnOnImap => Thread::Imap, + MarkseenMsgOnImap => Thread::Imap, + MoveMsg => Thread::Imap, + ConfigureImap => Thread::Imap, + ImexImap => Thread::Imap, + + MaybeSendLocations => Thread::Smtp, + MaybeSendLocationsEnded => Thread::Smtp, + SendMdnOld => Thread::Smtp, + SendMdn => Thread::Smtp, + SendMsgToSmtpOld => Thread::Smtp, + SendMsgToSmtp => Thread::Smtp, + } + } +} + +#[derive(Debug, Clone)] +pub struct Job { + pub job_id: u32, + pub action: Action, + pub foreign_id: u32, + pub desired_timestamp: i64, + pub added_timestamp: i64, + pub tries: i32, + pub param: Params, + pub try_again: i32, + pub pending_error: Option, +} + +impl Job { + fn delete(&self, context: &Context) -> bool { + context + .sql + .execute("DELETE FROM jobs WHERE id=?;", params![self.job_id as i32]) + .is_ok() + } + + fn update(&self, context: &Context) -> bool { + sql::execute( + context, + &context.sql, + "UPDATE jobs SET desired_timestamp=?, tries=?, param=? WHERE id=?;", + params![ + self.desired_timestamp, + self.tries as i64, + self.param.to_string(), + self.job_id as i32, + ], + ) + .is_ok() + } + + #[allow(non_snake_case)] + fn do_DC_JOB_SEND(&mut self, context: &Context) { + let ok_to_continue; + let mut filename = ptr::null_mut(); + let mut buf = ptr::null_mut(); + let mut buf_bytes = 0; + + /* connect to SMTP server, if not yet done */ + if !context.smtp.lock().unwrap().is_connected() { + let loginparam = dc_loginparam_read(context, &context.sql, "configured_"); + let connected = context.smtp.lock().unwrap().connect(context, &loginparam); + + if !connected { + self.try_again_later(3i32, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + let filename_s = self.param.get(Param::File).unwrap_or_default(); + filename = unsafe { filename_s.strdup() }; + if unsafe { strlen(filename) } == 0 { + warn!(context, 0, "Missing file name for job {}", self.job_id,); + } else if 0 != unsafe { dc_read_file(context, filename, &mut buf, &mut buf_bytes) } { + let recipients = self.param.get(Param::Recipients); + if recipients.is_none() { + warn!(context, 0, "Missing recipients for job {}", self.job_id,); + } else { + let recipients_list = recipients + .unwrap() + .split("\x1e") + .filter_map(|addr| match lettre::EmailAddress::new(addr.to_string()) { + Ok(addr) => Some(addr), + Err(err) => { + eprintln!("WARNING: invalid recipient: {} {:?}", addr, err); + None + } + }) + .collect::>(); + /* if there is a msg-id and it does not exist in the db, cancel sending. + this happends if dc_delete_msgs() was called + before the generated mime was sent out */ + let ok_to_continue1; + if 0 != self.foreign_id { + if 0 == unsafe { dc_msg_exists(context, self.foreign_id) } { + warn!( + context, + 0, + "Message {} for job {} does not exist", + self.foreign_id, + self.job_id, + ); + ok_to_continue1 = false; + } else { + ok_to_continue1 = true; + } + } else { + ok_to_continue1 = true; + } + if ok_to_continue1 { + /* send message */ + let body = unsafe { + std::slice::from_raw_parts(buf as *const u8, buf_bytes).to_vec() + }; + + // hold the smtp lock during sending of a job and + // its ok/error response processing. Note that if a message + // was sent we need to mark it in the database as we + // otherwise might send it twice. + let mut sock = context.smtp.lock().unwrap(); + if 0 == sock.send(context, recipients_list, body) { + sock.disconnect(); + self.try_again_later(-1i32, Some(as_str(sock.error))); + } else { + dc_delete_file(context, filename_s); + if 0 != self.foreign_id { + dc_update_msg_state( + context, + self.foreign_id, + MessageState::OutDelivered, + ); + let chat_id: i32 = context + .sql + .query_row_col( + context, + "SELECT chat_id FROM msgs WHERE id=?", + params![self.foreign_id as i32], + 0, + ) + .unwrap_or_default(); + context.call_cb( + Event::MSG_DELIVERED, + chat_id as uintptr_t, + self.foreign_id as uintptr_t, + ); + } + } + } + } + } + } + unsafe { free(buf) }; + unsafe { free(filename.cast()) }; + } + + // this value does not increase the number of tries + fn try_again_later(&mut self, try_again: libc::c_int, pending_error: Option<&str>) { + self.try_again = try_again; + self.pending_error = pending_error.map(|s| s.to_string()); + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { + let ok_to_continue; + let msg = unsafe { dc_msg_new_untyped(context) }; + let mut dest_uid = 0; + + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + if dc_msg_load_from_db(msg, context, self.foreign_id) { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + inbox.configure_folders(context, 0x1i32); + } + let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); + + let msg = unsafe { &mut *msg }; + + if let Some(dest_folder) = dest_folder { + let server_folder = msg.server_folder.as_ref().unwrap(); + + match inbox.mv( + context, + server_folder, + msg.server_uid, + &dest_folder, + &mut dest_uid, + ) as libc::c_uint + { + 1 => { + self.try_again_later(3i32, None); + } + 3 => { + dc_update_server_uid(context, msg.rfc724_mid, &dest_folder, dest_uid); + } + 0 | 2 | _ => {} + } + } + } + } + + unsafe { dc_msg_unref(msg) }; + } + + #[allow(non_snake_case)] + fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { + let mut delete_from_server = 1; + let msg = unsafe { dc_msg_new_untyped(context) }; + let inbox = context.inbox.read().unwrap(); + + if !(!dc_msg_load_from_db(msg, context, self.foreign_id) + || unsafe { (*msg).rfc724_mid.is_null() } + || unsafe { *(*msg).rfc724_mid.offset(0isize) as libc::c_int == 0 }) + { + let ok_to_continue1; + /* eg. device messages have no Message-ID */ + if dc_rfc724_mid_cnt(context, unsafe { (*msg).rfc724_mid }) != 1 { + info!( + context, + 0, "The message is deleted from the server when all parts are deleted.", + ); + delete_from_server = 0i32 + } + /* if this is the last existing part of the message, we delete the message from the server */ + if 0 != delete_from_server { + let ok_to_continue; + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3i32, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + let mid = unsafe { CStr::from_ptr((*msg).rfc724_mid).to_str().unwrap() }; + let server_folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + if 0 == inbox.delete_msg(context, mid, server_folder, unsafe { + &mut (*msg).server_uid + }) { + self.try_again_later(-1i32, None); + ok_to_continue1 = false; + } else { + ok_to_continue1 = true; + } + } else { + ok_to_continue1 = false; + } + } else { + ok_to_continue1 = true; + } + if ok_to_continue1 { + unsafe { dc_delete_msg_from_db(context, (*msg).id) }; + } + } + unsafe { dc_msg_unref(msg) } + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { + let ok_to_continue; + let msg = unsafe { dc_msg_new_untyped(context) }; + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3i32, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + if dc_msg_load_from_db(msg, context, self.foreign_id) { + let server_folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + match inbox.set_seen(context, server_folder, unsafe { (*msg).server_uid }) + as libc::c_uint + { + 0 => {} + 1 => { + self.try_again_later(3i32, None); + } + _ => { + if 0 != unsafe { (*msg).param.get_int(Param::WantsMdn).unwrap_or_default() } + && 0 != context + .sql + .get_config_int(context, "mdns_enabled") + .unwrap_or_else(|| 1) + { + let folder = unsafe { (*msg).server_folder.as_ref().unwrap() }; + + match inbox.set_mdnsent(context, folder, unsafe { (*msg).server_uid }) + as libc::c_uint + { + 1 => { + self.try_again_later(3i32, None); + } + 3 => { + send_mdn(context, unsafe { (*msg).id }); + } + 0 | 2 | _ => {} + } + } + } + } + } + } + unsafe { dc_msg_unref(msg) }; + } + + #[allow(non_snake_case)] + fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context) { + let ok_to_continue; + let folder = self + .param + .get(Param::ServerFolder) + .unwrap_or_default() + .to_string(); + let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; + let mut dest_uid = 0; + let inbox = context.inbox.read().unwrap(); + + if !inbox.is_connected() { + connect_to_inbox(context, &inbox); + if !inbox.is_connected() { + self.try_again_later(3, None); + ok_to_continue = false; + } else { + ok_to_continue = true; + } + } else { + ok_to_continue = true; + } + if ok_to_continue { + if inbox.set_seen(context, &folder, uid) == 0 { + self.try_again_later(3i32, None); + } + if 0 != self.param.get_int(Param::AlsoMove).unwrap_or_default() { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + inbox.configure_folders(context, 0x1i32); + } + let dest_folder = context.sql.get_config(context, "configured_mvbox_folder"); + if let Some(dest_folder) = dest_folder { + if 1 == inbox.mv(context, folder, uid, dest_folder, &mut dest_uid) + as libc::c_uint + { + self.try_again_later(3, None); + } + } + } + } + } +} + +/* delete all pending jobs with the given action */ +pub fn job_kill_action(context: &Context, action: Action) -> bool { + sql::execute( + context, + &context.sql, + "DELETE FROM jobs WHERE action=?;", + params![action], + ) + .is_ok() +} + +pub fn perform_imap_fetch(context: &Context) { + let inbox = context.inbox.read().unwrap(); + let start = std::time::Instant::now(); + + if 0 == connect_to_inbox(context, &inbox) { + return; + } + if context + .sql + .get_config_int(context, "inbox_watch") + .unwrap_or_else(|| 1) + == 0 + { + info!(context, 0, "INBOX-watch disabled.",); + return; + } + info!(context, 0, "INBOX-fetch started...",); + inbox.fetch(context); + if inbox.should_reconnect() { + info!(context, 0, "INBOX-fetch aborted, starting over...",); + inbox.fetch(context); + } + info!( + context, + 0, + "INBOX-fetch done in {:.4} ms.", + start.elapsed().as_nanos() as f64 / 1000.0, + ); +} + +pub fn perform_imap_idle(context: &Context) { + let inbox = context.inbox.read().unwrap(); + + connect_to_inbox(context, &inbox); + + if *context.perform_inbox_jobs_needed.clone().read().unwrap() { + info!( + context, + 0, "INBOX-IDLE will not be started because of waiting jobs." + ); + return; + } + info!(context, 0, "INBOX-IDLE started..."); + inbox.idle(context); + info!(context, 0, "INBOX-IDLE ended."); +} + +pub fn perform_mvbox_fetch(context: &Context) { + let use_network = context + .sql + .get_config_int(context, "mvbox_watch") + .unwrap_or_else(|| 1); + + context + .mvbox_thread + .write() + .unwrap() + .fetch(context, use_network == 1); +} + +pub fn perform_mvbox_idle(context: &Context) { + let use_network = context + .sql + .get_config_int(context, "mvbox_watch") + .unwrap_or_else(|| 1); + + context + .mvbox_thread + .read() + .unwrap() + .idle(context, use_network == 1); +} + +pub fn interrupt_mvbox_idle(context: &Context) { + context.mvbox_thread.read().unwrap().interrupt_idle(context); +} + +pub fn perform_sentbox_fetch(context: &Context) { + let use_network = context + .sql + .get_config_int(context, "sentbox_watch") + .unwrap_or_else(|| 1); + + context + .sentbox_thread + .write() + .unwrap() + .fetch(context, use_network == 1); +} + +pub fn perform_sentbox_idle(context: &Context) { + let use_network = context + .sql + .get_config_int(context, "sentbox_watch") + .unwrap_or_else(|| 1); + + context + .sentbox_thread + .read() + .unwrap() + .idle(context, use_network == 1); +} + +pub fn interrupt_sentbox_idle(context: &Context) { + context + .sentbox_thread + .read() + .unwrap() + .interrupt_idle(context); +} + +pub fn perform_smtp_jobs(context: &Context) { + let probe_smtp_network = { + let &(ref lock, _) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + + let probe_smtp_network = state.probe_network; + state.probe_network = false; + state.perform_jobs_needed = 0; + + if state.suspended { + info!(context, 0, "SMTP-jobs suspended.",); + return; + } + state.doing_jobs = true; + probe_smtp_network + }; + + info!(context, 0, "SMTP-jobs started...",); + job_perform(context, Thread::Smtp, probe_smtp_network); + info!(context, 0, "SMTP-jobs ended."); + + { + let &(ref lock, _) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + + state.doing_jobs = false; + } +} + +pub fn perform_smtp_idle(context: &Context) { + info!(context, 0, "SMTP-idle started...",); + { + let &(ref lock, ref cvar) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + + if state.perform_jobs_needed == 1 { + info!( + context, + 0, "SMTP-idle will not be started because of waiting jobs.", + ); + } else { + let dur = get_next_wakeup_time(context, Thread::Smtp); + + loop { + let res = cvar.wait_timeout(state, dur).unwrap(); + state = res.0; + + if state.idle == true || res.1.timed_out() { + // We received the notification and the value has been updated, we can leave. + break; + } + } + state.idle = false; + } + } + + info!(context, 0, "SMTP-idle ended.",); +} + +fn get_next_wakeup_time(context: &Context, thread: Thread) -> Duration { + let t: i64 = context + .sql + .query_row_col( + context, + "SELECT MIN(desired_timestamp) FROM jobs WHERE thread=?;", + params![thread], + 0, + ) + .unwrap_or_default(); + + let mut wakeup_time = Duration::new(10 * 60, 0); + let now = time(); + if t > 0 { + if t > now { + wakeup_time = Duration::new((t - now) as u64, 0); + } else { + wakeup_time = Duration::new(0, 0); + } + } + + wakeup_time +} + +pub fn maybe_network(context: &Context) { + { + let &(ref lock, _) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + state.probe_network = true; + + *context.probe_imap_network.write().unwrap() = true; + } + + interrupt_smtp_idle(context); + interrupt_imap_idle(context); + interrupt_mvbox_idle(context); + interrupt_sentbox_idle(context); +} + +pub fn job_action_exists(context: &Context, action: Action) -> bool { + context + .sql + .exists("SELECT id FROM jobs WHERE action=?;", params![action]) + .unwrap_or_default() +} + +/* special case for DC_JOB_SEND_MSG_TO_SMTP */ +#[allow(non_snake_case)] +pub unsafe fn job_send_msg(context: &Context, msg_id: uint32_t) -> libc::c_int { + let mut success = 0; + let mut mimefactory = dc_mimefactory_t { + from_addr: ptr::null_mut(), + from_displayname: ptr::null_mut(), + selfstatus: ptr::null_mut(), + recipients_names: ptr::null_mut(), + recipients_addr: ptr::null_mut(), + timestamp: 0, + rfc724_mid: ptr::null_mut(), + loaded: DC_MF_NOTHING_LOADED, + msg: ptr::null_mut(), + chat: None, + increation: 0, + in_reply_to: ptr::null_mut(), + references: ptr::null_mut(), + req_mdn: 0, + out: ptr::null_mut(), + out_encrypted: 0, + out_gossiped: 0, + out_last_added_location_id: 0, + error: ptr::null_mut(), + context, + }; + + /* load message data */ + if 0 == dc_mimefactory_load_msg(&mut mimefactory, msg_id) || mimefactory.from_addr.is_null() { + warn!( + context, + 0, "Cannot load data to send, maybe the message is deleted in between.", + ); + } else { + // no redo, no IMAP. moreover, as the data does not exist, there is no need in calling dc_set_msg_failed() + if chat::msgtype_has_file((*mimefactory.msg).type_0) { + if let Some(pathNfilename) = (*mimefactory.msg).param.get(Param::File) { + if ((*mimefactory.msg).type_0 == Viewtype::Image + || (*mimefactory.msg).type_0 == Viewtype::Gif) + && !(*mimefactory.msg).param.exists(Param::Width) + { + (*mimefactory.msg).param.set_int(Param::Width, 0); + (*mimefactory.msg).param.set_int(Param::Height, 0); + + if let Some(buf) = dc_read_file_safe(context, pathNfilename) { + if let Ok((width, height)) = dc_get_filemeta(&buf) { + (*mimefactory.msg).param.set_int(Param::Width, width as i32); + (*mimefactory.msg) + .param + .set_int(Param::Height, height as i32); + } + } + dc_msg_save_param_to_disk(mimefactory.msg); + } + } + } + /* create message */ + if 0 == dc_mimefactory_render(&mut mimefactory) { + dc_set_msg_failed(context, msg_id, as_opt_str(mimefactory.error)); + } else if 0 + != (*mimefactory.msg) + .param + .get_int(Param::GuranteeE2ee) + .unwrap_or_default() + && 0 == mimefactory.out_encrypted + { + warn!( + context, + 0, + "e2e encryption unavailable {} - {:?}", + msg_id, + (*mimefactory.msg).param.get_int(Param::GuranteeE2ee), + ); + dc_set_msg_failed( + context, + msg_id, + Some("End-to-end-encryption unavailable unexpectedly."), + ); + } else { + /* unrecoverable */ + if clist_search_string_nocase(mimefactory.recipients_addr, mimefactory.from_addr) + == 0i32 + { + clist_insert_after( + mimefactory.recipients_names, + (*mimefactory.recipients_names).last, + 0 as *mut libc::c_void, + ); + clist_insert_after( + mimefactory.recipients_addr, + (*mimefactory.recipients_addr).last, + dc_strdup(mimefactory.from_addr) as *mut libc::c_void, + ); + } + if 0 != mimefactory.out_gossiped { + chat::set_gossiped_timestamp(context, (*mimefactory.msg).chat_id, time()); + } + if 0 != mimefactory.out_last_added_location_id { + dc_set_kml_sent_timestamp(context, (*mimefactory.msg).chat_id, time()); + if 0 == (*mimefactory.msg).hidden { + dc_set_msg_location_id( + context, + (*mimefactory.msg).id, + mimefactory.out_last_added_location_id, + ); + } + } + if 0 != mimefactory.out_encrypted + && (*mimefactory.msg) + .param + .get_int(Param::GuranteeE2ee) + .unwrap_or_default() + == 0 + { + (*mimefactory.msg).param.set_int(Param::GuranteeE2ee, 1); + dc_msg_save_param_to_disk(mimefactory.msg); + } + success = add_smtp_job(context, Action::SendMsgToSmtp, &mut mimefactory); + } + } + dc_mimefactory_empty(&mut mimefactory); + + success +} + +pub fn perform_imap_jobs(context: &Context) { + info!(context, 0, "dc_perform_imap_jobs starting.",); + + let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); + *context.probe_imap_network.write().unwrap() = false; + *context.perform_inbox_jobs_needed.write().unwrap() = false; + + job_perform(context, Thread::Imap, probe_imap_network); + info!(context, 0, "dc_perform_imap_jobs ended.",); +} + +fn job_perform(context: &Context, thread: Thread, probe_network: bool) { + let query = if !probe_network { + // processing for first-try and after backoff-timeouts: + // process jobs in the order they were added. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" + } else { + // processing after call to dc_maybe_network(): + // process _all_ pending jobs that failed before + // in the order of their backoff-times. + "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ + FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" + }; + + let params_no_probe = params![thread as i64, time()]; + let params_probe = params![thread as i64]; + let params: &[&dyn rusqlite::ToSql] = if !probe_network { + params_no_probe + } else { + params_probe + }; + + let jobs: Result, _> = context.sql.query_map( + query, + params, + |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + try_again: 0, + pending_error: None, + }; + + Ok(job) + }, + |jobs| jobs.collect::, _>>().map_err(Into::into), + ); + match jobs { + Ok(ref _res) => {} + Err(ref err) => { + info!(context, 0, "query failed: {:?}", err); + } + } + + for mut job in jobs.unwrap_or_default() { + info!( + context, + 0, + "{}-job #{}, action {} started...", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id, + job.action, + ); + + // some configuration jobs are "exclusive": + // - they are always executed in the imap-thread and the smtp-thread is suspended during execution + // - they may change the database handle change the database handle; we do not keep old pointers therefore + // - they can be re-executed one time AT_ONCE, but they are not save in the database for later execution + if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + job_kill_action(context, job.action); + &context + .sentbox_thread + .clone() + .read() + .unwrap() + .suspend(context); + &context + .mvbox_thread + .clone() + .read() + .unwrap() + .suspend(context); + suspend_smtp_thread(context, true); + } + + let mut tries = 0; + while tries <= 1 { + // this can be modified by a job using dc_job_try_again_later() + job.try_again = 0; + + match job.action { + Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context), + Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context), + Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context), + Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context), + Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context), + Action::SendMdn => job.do_DC_JOB_SEND(context), + Action::ConfigureImap => unsafe { dc_job_do_DC_JOB_CONFIGURE_IMAP(context, &job) }, + Action::ImexImap => unsafe { dc_job_do_DC_JOB_IMEX_IMAP(context, &job) }, + Action::MaybeSendLocations => unsafe { + dc_job_do_DC_JOB_MAYBE_SEND_LOCATIONS(context, &job) + }, + Action::MaybeSendLocationsEnded => unsafe { + dc_job_do_DC_JOB_MAYBE_SEND_LOC_ENDED(context, &mut job) + }, + Action::Housekeeping => sql::housekeeping(context), + Action::SendMdnOld => {} + Action::SendMsgToSmtpOld => {} + } + if job.try_again != -1 { + break; + } + tries += 1 + } + if Action::ConfigureImap == job.action || Action::ImexImap == job.action { + context + .sentbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); + context + .mvbox_thread + .clone() + .read() + .unwrap() + .unsuspend(context); + suspend_smtp_thread(context, false); + break; + } else if job.try_again == 2 { + // just try over next loop unconditionally, the ui typically interrupts idle when the file (video) is ready + info!( + context, + 0, + "{}-job #{} not yet ready and will be delayed.", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id + ); + } else if job.try_again == -1 || job.try_again == 3 { + let tries = job.tries + 1; + if tries < 17 { + job.tries = tries; + let time_offset = get_backoff_time_offset(tries); + job.desired_timestamp = job.added_timestamp + time_offset; + job.update(context); + info!( + context, + 0, + "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", + if thread == Thread::Imap { + "INBOX" + } else { + "SMTP" + }, + job.job_id as u32, + tries, + time_offset, + job.added_timestamp + time_offset - time() + ); + if thread == Thread::Smtp && tries < 17 - 1 { + context + .smtp_state + .clone() + .0 + .lock() + .unwrap() + .perform_jobs_needed = 2; + } + } else { + if job.action == Action::SendMsgToSmtp { + unsafe { + dc_set_msg_failed(context, job.foreign_id, job.pending_error.as_ref()) + }; + } + job.delete(context); + } + if !probe_network { + continue; + } + // on dc_maybe_network() we stop trying here; + // these jobs are already tried once. + // otherwise, we just continue with the next job + // to give other jobs a chance being tried at least once. + break; + } else { + job.delete(context); + } + } +} + +#[allow(non_snake_case)] +fn get_backoff_time_offset(c_tries: libc::c_int) -> i64 { + // results in ~3 weeks for the last backoff timespan + let mut N = 2_i32.pow((c_tries - 1) as u32); + N = N * 60; + let mut rng = thread_rng(); + let n: i32 = rng.gen(); + let mut seconds = n % (N + 1); + if seconds < 1 { + seconds = 1; + } + seconds as i64 +} + +fn suspend_smtp_thread(context: &Context, suspend: bool) { + context.smtp_state.0.lock().unwrap().suspended = suspend; + if suspend { + loop { + if !context.smtp_state.0.lock().unwrap().doing_jobs { + return; + } + std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); + } + } +} + +fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { + let ret_connected = dc_connect_to_configured_imap(context, inbox); + if 0 != ret_connected { + inbox.set_watch_folder("INBOX".into()); + } + ret_connected +} + +fn send_mdn(context: &Context, msg_id: uint32_t) { + let mut mimefactory = dc_mimefactory_t { + from_addr: ptr::null_mut(), + from_displayname: ptr::null_mut(), + selfstatus: ptr::null_mut(), + recipients_names: ptr::null_mut(), + recipients_addr: ptr::null_mut(), + timestamp: 0, + rfc724_mid: ptr::null_mut(), + loaded: DC_MF_NOTHING_LOADED, + msg: ptr::null_mut(), + chat: None, + increation: 0, + in_reply_to: ptr::null_mut(), + references: ptr::null_mut(), + req_mdn: 0, + out: ptr::null_mut(), + out_encrypted: 0, + out_gossiped: 0, + out_last_added_location_id: 0, + error: ptr::null_mut(), + context, + }; + + if !(0 == unsafe { dc_mimefactory_load_mdn(&mut mimefactory, msg_id) } + || 0 == unsafe { dc_mimefactory_render(&mut mimefactory) }) + { + add_smtp_job(context, Action::SendMdn, &mut mimefactory); + } +} + +#[allow(non_snake_case)] +fn add_smtp_job(context: &Context, action: Action, mimefactory: &dc_mimefactory_t) -> libc::c_int { + let pathNfilename: *mut libc::c_char; + let mut success: libc::c_int = 0i32; + let mut recipients: *mut libc::c_char = 0 as *mut libc::c_char; + let mut param = Params::new(); + pathNfilename = unsafe { + dc_get_fine_pathNfilename( + context, + b"$BLOBDIR\x00" as *const u8 as *const libc::c_char, + mimefactory.rfc724_mid, + ) + }; + if pathNfilename.is_null() { + error!( + context, + 0, + "Could not find free file name for message with ID <{}>.", + to_string(mimefactory.rfc724_mid), + ); + } else if 0 + == unsafe { + dc_write_file( + context, + pathNfilename, + (*mimefactory.out).str_0 as *const libc::c_void, + (*mimefactory.out).len, + ) + } + { + error!( + context, + 0, + "Could not write message <{}> to \"{}\".", + to_string(mimefactory.rfc724_mid), + as_str(pathNfilename), + ); + } else { + recipients = unsafe { + dc_str_from_clist( + mimefactory.recipients_addr, + b"\x1e\x00" as *const u8 as *const libc::c_char, + ) + }; + param.set(Param::File, as_str(pathNfilename)); + param.set(Param::Recipients, as_str(recipients)); + job_add( + context, + action, + (if mimefactory.loaded as libc::c_uint + == DC_MF_MSG_LOADED as libc::c_int as libc::c_uint + { + unsafe { (*mimefactory.msg).id } + } else { + 0 + }) as libc::c_int, + param, + 0, + ); + success = 1; + } + unsafe { + free(recipients.cast()); + free(pathNfilename.cast()); + } + success +} + +pub fn job_add( + context: &Context, + action: Action, + foreign_id: libc::c_int, + param: Params, + delay_seconds: i64, +) { + let timestamp = time(); + let thread: Thread = action.into(); + + sql::execute( + context, + &context.sql, + "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + params![ + timestamp, + thread, + action, + foreign_id, + param.to_string(), + (timestamp + delay_seconds as i64) + ] + ).ok(); + + match thread { + Thread::Imap => interrupt_imap_idle(context), + Thread::Smtp => interrupt_smtp_idle(context), + } +} + +pub fn interrupt_smtp_idle(context: &Context) { + info!(context, 0, "Interrupting SMTP-idle...",); + + let &(ref lock, ref cvar) = &*context.smtp_state.clone(); + let mut state = lock.lock().unwrap(); + + state.perform_jobs_needed = 1; + state.idle = true; + cvar.notify_one(); +} + +pub fn interrupt_imap_idle(context: &Context) { + info!(context, 0, "Interrupting IMAP-IDLE...",); + + *context.perform_inbox_jobs_needed.write().unwrap() = true; + context.inbox.read().unwrap().interrupt_idle(); +} diff --git a/src/job_thread.rs b/src/job_thread.rs new file mode 100644 index 0000000000..eb62de8bad --- /dev/null +++ b/src/job_thread.rs @@ -0,0 +1,181 @@ +use std::sync::{Arc, Condvar, Mutex}; + +use crate::context::Context; +use crate::dc_configure::*; +use crate::imap::Imap; + +pub struct JobThread { + pub name: &'static str, + pub folder_config_name: &'static str, + pub imap: Imap, + pub state: Arc<(Mutex, Condvar)>, +} + +#[derive(Clone, Debug, Default)] +pub struct JobState { + idle: bool, + jobs_needed: i32, + suspended: bool, + using_handle: bool, +} + +impl JobThread { + pub fn new(name: &'static str, folder_config_name: &'static str, imap: Imap) -> Self { + JobThread { + name, + folder_config_name, + imap, + state: Arc::new((Mutex::new(Default::default()), Condvar::new())), + } + } + + pub fn suspend(&self, context: &Context) { + info!(context, 0, "Suspending {}-thread.", self.name,); + { + self.state.0.lock().unwrap().suspended = true; + } + self.interrupt_idle(context); + loop { + let using_handle = self.state.0.lock().unwrap().using_handle; + if !using_handle { + return; + } + std::thread::sleep(std::time::Duration::from_micros(300 * 1000)); + } + } + + pub fn unsuspend(&self, context: &Context) { + info!(context, 0, "Unsuspending {}-thread.", self.name); + + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + state.suspended = false; + state.idle = true; + cvar.notify_one(); + } + + pub fn interrupt_idle(&self, context: &Context) { + { + self.state.0.lock().unwrap().jobs_needed = 1; + } + + info!(context, 0, "Interrupting {}-IDLE...", self.name); + + self.imap.interrupt_idle(); + + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + state.idle = true; + cvar.notify_one(); + } + + pub fn fetch(&mut self, context: &Context, use_network: bool) { + { + let &(ref lock, _) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + if state.suspended { + return; + } + + state.using_handle = true; + } + + if use_network { + let start = std::time::Instant::now(); + if self.connect_to_imap(context) { + info!(context, 0, "{}-fetch started...", self.name); + self.imap.fetch(context); + + if self.imap.should_reconnect() { + info!(context, 0, "{}-fetch aborted, starting over...", self.name,); + self.imap.fetch(context); + } + info!( + context, + 0, + "{}-fetch done in {:.3} ms.", + self.name, + start.elapsed().as_millis(), + ); + } + } + + self.state.0.lock().unwrap().using_handle = false; + } + + fn connect_to_imap(&self, context: &Context) -> bool { + if self.imap.is_connected() { + return true; + } + + let mut ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; + + if ret_connected { + if context + .sql + .get_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + self.imap.configure_folders(context, 0x1); + } + + if let Some(mvbox_name) = context.sql.get_config(context, self.folder_config_name) { + self.imap.set_watch_folder(mvbox_name); + } else { + self.imap.disconnect(context); + ret_connected = false; + } + } + + ret_connected + } + + pub fn idle(&self, context: &Context, use_network: bool) { + { + let &(ref lock, ref cvar) = &*self.state.clone(); + let mut state = lock.lock().unwrap(); + + if 0 != state.jobs_needed { + info!( + context, + 0, + "{}-IDLE will not be started as it was interrupted while not ideling.", + self.name, + ); + state.jobs_needed = 0; + return; + } + + if state.suspended { + while !state.idle { + state = cvar.wait(state).unwrap(); + } + state.idle = false; + return; + } + + state.using_handle = true; + + if !use_network { + state.using_handle = false; + + while !state.idle { + state = cvar.wait(state).unwrap(); + } + state.idle = false; + return; + } + } + + self.connect_to_imap(context); + info!(context, 0, "{}-IDLE started...", self.name,); + self.imap.idle(context); + info!(context, 0, "{}-IDLE ended.", self.name); + + self.state.0.lock().unwrap().using_handle = false; + } +} diff --git a/src/lib.rs b/src/lib.rs index aab5535efd..bdebfb8510 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,8 @@ pub mod constants; pub mod contact; pub mod context; mod imap; +pub mod job; +mod job_thread; pub mod key; pub mod keyring; pub mod lot; @@ -49,8 +51,6 @@ pub mod dc_configure; mod dc_dehtml; mod dc_e2ee; pub mod dc_imex; -pub mod dc_job; -mod dc_jobthread; pub mod dc_location; mod dc_loginparam; mod dc_mimefactory;