Skip to content

Commit

Permalink
182 eldritch sleep blocks imix agent (#208)
Browse files Browse the repository at this point in the history
* Couple tests.

* Resolve async errors.
  • Loading branch information
hulto authored Jun 15, 2023
1 parent c945c0a commit b890003
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 42 deletions.
2 changes: 1 addition & 1 deletion implants/golem/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn main() -> anyhow::Result<()> {
tome_files_and_content.push( (tome_path, tome_contents) )
}

let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand Down
138 changes: 97 additions & 41 deletions implants/imix/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ async fn handle_exec_tome(task: GraphQLTask, print_channel_sender: Sender<String

let print_handler = EldritchPrintHandler{ sender: print_channel_sender };

println!("{:?}",task_job.parameters);
// Execute a tome script
let res = match thread::spawn(move || { eldritch_run(tome_name, tome_contents, task_job.parameters, &print_handler) }).join() {
Ok(local_thread_res) => local_thread_res,
Err(_) => todo!(),
};

// let res = eldritch_run(tome_name, tome_contents, task_job.parameters, &print_handler);
match res {
Ok(tome_output) => Ok((tome_output, "".to_string())),
Err(tome_error) => Ok(("".to_string(), tome_error.to_string())),
Expand All @@ -69,7 +68,6 @@ async fn handle_exec_timeout_and_response(task: graphql::GraphQLTask, print_chan

// Define a future for our execution task
let exec_future = handle_exec_tome(task.clone(), print_channel_sender.clone());

// Execute that future with a timeout defined by the timeout argument.
let tome_result = match tokio::time::timeout(timeout_duration, exec_future).await {
Ok(res) => {
Expand All @@ -81,6 +79,10 @@ async fn handle_exec_timeout_and_response(task: graphql::GraphQLTask, print_chan
Err(timer_elapsed) => ("".to_string(), format!("Time elapsed task {} has been running for {} seconds", task.id, timer_elapsed.to_string())),
};

// let tome_result = tokio::task::spawn(exec_future).await??;
// let tome_result = tokio::spawn(exec_future).await??;


print_channel_sender.clone().send(format!("---[RESULT]----\n{}\n---------",tome_result.0))?;
print_channel_sender.clone().send(format!("---[ERROR]----\n{}\n--------",tome_result.1))?;
Ok(())
Expand Down Expand Up @@ -130,7 +132,7 @@ fn get_primary_ip() -> Result<String> {
}
},
Err(e) => {
println!("Error getting primary ip address:\n{e}");
eprintln!("Error getting primary ip address:\n{e}");
"DANGER-UNKNOWN".to_string()
},
};
Expand Down Expand Up @@ -166,7 +168,7 @@ fn get_os_pretty_name() -> Result<String> {

// Async handler for port scanning.
async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
let debug = true;
let debug = false;
let version_string = "v0.1.0";
let config_file = File::open(config_path)?;
let imix_config: imix::Config = serde_json::from_reader(config_file)?;
Expand Down Expand Up @@ -234,71 +236,74 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
};

loop {
let start_time = Utc::now().time();
// 0. Get loop start time
let loop_start_time = Instant::now();
if debug { println!("Get new tasks"); }
// 1. Pull down new tasks
// 1a) calculate callback uri
let cur_callback_uri = imix_config.callback_config.c2_configs[0].uri.clone();

if debug { println!("[{}]: collecting tasks", (Utc::now().time() - start_time).num_milliseconds()) }
// 1b) Collect new tasks
let new_tasks = match graphql::gql_claim_tasks(cur_callback_uri.clone(), claim_tasks_input.clone()).await {
Ok(tasks) => tasks,
Err(error) => {
if debug {
println!("main_loop: error claiming task\n{:?}", error)
}
if debug { println!("main_loop: error claiming task\n{:?}", error) }
let empty_vec = vec![];
empty_vec
},
};

if debug { println!("Starting {} new tasks", new_tasks.len()); }
if debug { println!("[{}]: Starting {} new tasks", (Utc::now().time() - start_time).num_milliseconds(), new_tasks.len()); }
// 2. Start new tasks
for task in new_tasks {
if debug { println!("Launching:\n{:?}", task.clone().job.unwrap().tome.eldritch); }

let (sender, receiver) = channel::<String>();
let exec_with_timeout = handle_exec_timeout_and_response(task.clone(), sender.clone());
if debug { println!("[{}]: Queueing task {}", (Utc::now().time() - start_time).num_milliseconds(), task.clone().id); }
match all_exec_futures.insert(task.clone().id, ExecTask{
future_join_handle: task::spawn(exec_with_timeout),
start_time: Utc::now(),
graphql_task: task.clone(),
print_reciever: receiver,
}) {
Some(_old_task) => {
if debug {
println!("main_loop: error adding new task. Non-unique taskID\n");
}
if debug {println!("main_loop: error adding new task. Non-unique taskID\n");}
},
None => {}, // Task queued successfully
None => {
if debug {println!("main_loop: Task queued successfully\n");}
}, // Task queued successfully
}
if debug { println!("[{}]: Queued task {}", (Utc::now().time() - start_time).num_milliseconds(), task.clone().id); }
}

if debug { println!("Sleeping"); }
// 3. Sleep till callback time
// time_to_wait - time_elapsed
let time_to_sleep = imix_config.callback_config.interval - loop_start_time.elapsed().as_secs() ;
tokio::time::sleep(std::time::Duration::new(time_to_sleep, 24601)).await;

let time_to_sleep = imix_config.callback_config.interval - loop_start_time.elapsed().as_secs();
if debug { println!("[{}]: Sleeping seconds {}", (Utc::now().time() - start_time).num_milliseconds(), time_to_sleep); }
// tokio::time::sleep(std::time::Duration::new(time_to_sleep, 24601)).await; // This seems to wait for other threads to finish.
std::thread::sleep(std::time::Duration::new(time_to_sleep, 24601)); // This just sleeps our thread.

// :clap: :clap: make new map!
let mut running_exec_futures: HashMap<String, ExecTask> = HashMap::new();

if debug { println!("Checking status"); }
if debug { println!("[{}]: Checking task status", (Utc::now().time() - start_time).num_milliseconds()); }
// Check status & send response
for exec_future in all_exec_futures.into_iter() {
if debug {
println!("{}: {:?}", exec_future.0, exec_future.1.future_join_handle.is_finished());
}
if debug { println!("[{}]: Task # {} is_finished? {}", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0, exec_future.1.future_join_handle.is_finished()); }
let mut res: Vec<String> = vec![];
// Loop over each line of output from the task.
loop {
if debug { println!("Reciveing output"); }
if debug { println!("[{}]: Task # {} recieving output", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0); }
let new_res_line = match exec_future.1.print_reciever.recv_timeout(Duration::from_millis(100)) {
Ok(local_res_string) => local_res_string,
Ok(local_res_string) => {
local_res_string
},
Err(local_err) => {
match local_err.to_string().as_str() {
"channel is empty and sending half is closed" => { break; },
"timed out waiting on channel" => { break; },
_ => eprint!("Error: {}", local_err),
}
break;
Expand Down Expand Up @@ -328,16 +333,14 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
}
},
};
if debug {
println!("{}", task_response.output);
}
if debug { println!("[{}]: Task {} output: {}", (Utc::now().time() - start_time).num_milliseconds(), exec_future.0, task_response.output); }
let submit_task_result = graphql::gql_post_task_result(cur_callback_uri.clone(), task_response).await;
let _ = match submit_task_result {
Ok(_) => Ok(()), // Currently no reason to save the task since it's the task we just answered.
Err(error) => Err(error),
};

// Only re-insert the runnine exec futures
// Only re-insert the runnine exec futures
if !exec_future.1.future_join_handle.is_finished() {
running_exec_futures.insert(exec_future.0, exec_future.1);
}
Expand All @@ -349,7 +352,6 @@ async fn main_loop(config_path: String, run_once: bool) -> Result<()> {
}
}


pub fn main() -> Result<(), imix::Error> {
let matches = Command::new("imix")
.arg(
Expand All @@ -371,7 +373,7 @@ pub fn main() -> Result<(), imix::Error> {
.get_matches();


let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand All @@ -393,7 +395,7 @@ pub fn main() -> Result<(), imix::Error> {
if let Some(config_path) = matches.value_of("config") {
match runtime.block_on(main_loop(config_path.to_string(), false)) {
Ok(_) => {},
Err(error) => println!("Imix mail_loop exited unexpectedly with config: {}\n{}", config_path.to_string(), error),
Err(error) => eprintln!("Imix main_loop exited unexpectedly with config: {}\n{}", config_path.to_string(), error),
}
}
Ok(())
Expand All @@ -414,7 +416,6 @@ mod tests {
let primary_ip_address = match get_primary_ip() {
Ok(local_primary_ip) => local_primary_ip,
Err(local_error) => {
println!("An error occured during testing default_ip:{local_error}");
assert_eq!(false,true);
"DANGER-UNKNOWN".to_string()
},
Expand All @@ -425,7 +426,6 @@ mod tests {
#[test]
fn imix_test_get_os_pretty_name() {
let res = get_os_pretty_name().unwrap();
println!("{res}");
assert!(!res.contains("UNKNOWN"));
}

Expand Down Expand Up @@ -453,7 +453,7 @@ sys.shell(input_params["cmd"])
};


let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand All @@ -467,7 +467,6 @@ sys.shell(input_params["cmd"])
let stdout = receiver.recv_timeout(Duration::from_millis(500)).unwrap();
assert_eq!(stdout, "custom_print_handler_test".to_string());

println!("{:?}", result.clone());
let mut bool_res = false;

if cfg!(target_os = "linux") ||
Expand All @@ -487,6 +486,65 @@ sys.shell(input_params["cmd"])

}


#[test]
fn imix_test_main_loop_sleep_twice_short() -> Result<()> {
// Response expectations are poped in reverse order.
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/graphql"),
request::body(matches(".*ImixPostResult.*main_loop_test_success.*"))
])
.times(2)
.respond_with(status_code(200)
.body(r#"{"data":{"submitTaskResult":{"id":"17179869185"}}}"#)),
);
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/graphql"),
request::body(matches(".*claimTasks.*"))
])
.times(1)
.respond_with(status_code(200)
.body(r#"{"data":{"claimTasks":[{"id":"17179869185","job":{"id":"4294967297","name":"Sleep1","parameters":"{}","tome":{"id":"21474836482","name":"sleep","description":"sleep stuff","paramDefs":"{}","eldritch":"def test():\n if sys.is_macos():\n sys.shell(\"sleep 3\")\n if sys.is_linux():\n sys.shell(\"sleep 3\")\n if sys.is_windows():\n sys.shell(\"timeout 3\")\ntest()\nprint(\"main_loop_test_success\")","files":[]},"bundle":null}},{"id":"17179869186","job":{"id":"4294967298","name":"Sleep1","parameters":"{}","tome":{"id":"21474836483","name":"sleep","description":"sleep stuff","paramDefs":"{}","eldritch":"def test():\n if sys.is_macos():\n sys.shell(\"sleep 3\")\n if sys.is_linux():\n sys.shell(\"sleep 3\")\n if sys.is_windows():\n sys.shell(\"timeout 3\")\ntest()\nprint(\"main_loop_test_success\")","files":[]},"bundle":null}}]}}"#)),
);

let tmp_file_new = NamedTempFile::new()?;
let path_new = String::from(tmp_file_new.path().to_str().unwrap()).clone();
let url = server.url("/graphql").to_string();
let _ = std::fs::write(path_new.clone(),format!(r#"{{
"service_configs": [],
"target_forward_connect_ip": "127.0.0.1",
"target_name": "test1234",
"callback_config": {{
"interval": 4,
"jitter": 0,
"timeout": 4,
"c2_configs": [
{{
"priority": 1,
"uri": "{url}"
}}
]
}}
}}"#));

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

// Define a future for our execution task
let start_time = Utc::now().time();
let exec_future = main_loop(path_new, true);
let _result = runtime.block_on(exec_future).unwrap();
let end_time = Utc::now().time();
let diff = (end_time - start_time).num_milliseconds();
assert!(diff < 4500);
Ok(())
}

#[test]
fn imix_test_main_loop_run_once() -> Result<()> {

Expand Down Expand Up @@ -519,7 +577,7 @@ sys.shell(input_params["cmd"])
"target_forward_connect_ip": "127.0.0.1",
"target_name": "test1234",
"callback_config": {{
"interval": 8,
"interval": 4,
"jitter": 1,
"timeout": 4,
"c2_configs": [
Expand All @@ -531,20 +589,18 @@ sys.shell(input_params["cmd"])
}}
}}"#));

let runtime = tokio::runtime::Builder::new_current_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

// let (sender, receiver) = channel::<String>();

// // Define a future for our execution task
// let exec_future = handle_exec_tome(test_tome_input, sender.clone())
let exec_future = main_loop(path_new, true);
let _result = runtime.block_on(exec_future).unwrap();

assert!(true);
Ok(())
}


}

0 comments on commit b890003

Please sign in to comment.