Skip to content

Commit

Permalink
bump to 0.3.0: panic-tolerant, thread join support
Browse files Browse the repository at this point in the history
  • Loading branch information
uuhan committed Jan 20, 2024
1 parent d4a611c commit cbb4666
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "terminate-thread"
version = "0.2.1-alpha.0"
version = "0.3.0"
edition = "2021"
repository = "https://github.com/uuhan/terminate-thread"
description = "A simple terminatable thread implemented with pthread"
Expand Down
44 changes: 32 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,47 @@ do it with the standard `std::thread` without putting into some `Sync` thing.

```toml
[dependencies]
terminate-thread = "0.2"
terminate-thread = "0.3"
```

### 1. Terminate an infinite loop
### Spawn your thread

```rust
use terminate_thread::Thread;
Thread::spawn(|| {}).join(); // ← spawn & join (>= 0.3.0) your thread
```

### Manually terminate your thread

```rust
use terminate_thread::Thread;
let thr = Thread::spawn(|| loop {
// infinite loop in this thread
println!("loop run");
std::thread::sleep(std::time::Duration::from_secs(1));
});

std::thread::sleep(std::time::Duration::from_secs(1));
thr.terminate() // ← the thread is terminated manually!
```

### Auto terminate your thread

```rust
use terminate_thread::Thread;
{
let _thread = Thread::spawn(|| loop {}); // ← the thread will be terminated when thread is dropped
}
```

### Panic tolerant

// Just terminate it
thr.terminate()
```rust
use terminate_thread::Thread;
Thread::spawn(|| panic!()); // ← this is fine
let thread = Thread::spawn(|| panic!("your message")).join(); // ← thread stores the panic info
assert!(thread.over() && thread.panics()); // ← it's over and panics
let info = thread.panic_info().lock().unwrap().take().unwrap(); // ← take out the panic info
assert_eq!(info.downcast_ref::<&str>().unwrap(), &"your message"); // ← get your panic info
```

## Not a good idea!
Expand All @@ -51,17 +74,14 @@ but the real world is sophisticated to make any promise.

## To-do

- [ ] Terminate the job which panics. 🚧
- [x] Terminate the job which panics. >= v0.3.0

```rust
use terminate_thread::Thread;
Thread::spawn(|| panic!());

let thread = Thread::spawn(|| panic!());

// thread.terminate();
Thread::spawn(|| panic!()); // ← this is fine

std::thread::sleep(std::time::Duration::from_millis(500));
let thread = Thread::spawn(|| panic!()).join(); // ← thread stores the panic info
assert!(thread.over() && thread.panics()); // ← it's over and panics
```

## Issue
Expand Down
19 changes: 17 additions & 2 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,29 @@ use std::{thread::sleep, time::Duration};
use terminate_thread::Thread;

fn main() {
static PANIC_MSG: &'static str = "the loop in thread panic...";

let thread = Thread::spawn(|| loop {
println!("infinite loop");
sleep(Duration::from_secs(1));
panic!("{}", PANIC_MSG);
});

sleep(Duration::from_secs(5));
sleep(Duration::from_secs(3));
thread.terminate();

println!("\nIs thread panic? {}", thread.panics());

let info = thread.panic_info();

if let Some(info) = info.lock().unwrap().take() {
if let Some(msg) = info.downcast_ref::<String>() {
println!("Panic message is: {}", msg);
} else {
println!("{:?}", info);
}
}

sleep(Duration::from_secs(1));
println!("exit");
println!("\nexit");
}
95 changes: 73 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
#![allow(non_camel_case_types)]
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Condvar, Mutex};

#[allow(dead_code)]
mod api {
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}
mod ext;

pub type ThreadPanicInfo = Arc<Mutex<Option<Box<dyn std::any::Any>>>>;
pub type ThreadGuard = Arc<(Condvar, Mutex<bool>)>;

#[doc = include_str!("../README.md")]
#[derive(Clone)]
pub struct Thread(pub(crate) Arc<ThreadInner>);

struct ThreadInner {
/// hold the Thread instance
inner: *mut api::terminate_thread_t,
/// the call is over
over: Arc<AtomicBool>,
/// thread safty
guard: Mutex<()>,
/// condvar to wait for call finish
guard: ThreadGuard,
/// store the panic info
panic_info: ThreadPanicInfo,
}

unsafe impl Send for Thread {}
Expand All @@ -31,38 +33,60 @@ impl Thread {
{
// Trampoile Function For Fn
unsafe extern "C" fn trampoile(xarg: *mut std::os::raw::c_void) {
let pair: Box<(Box<dyn FnOnce() + Send + 'static>, Arc<AtomicBool>)> =
Box::from_raw(xarg as _);
let pair: Box<(
Box<dyn FnOnce() + Send + 'static>,
ThreadPanicInfo,
ThreadGuard,
)> = Box::from_raw(xarg as _);

let (call, panic_info, guard) = *pair;

let (call, over) = *pair;
let call = std::panic::AssertUnwindSafe(call);

call();
// Store the panic info
if let Err(info) = std::panic::catch_unwind(call) {
panic_info.lock().unwrap().replace(info);
}

over.swap(true, Relaxed);
let mut over = guard.1.lock().unwrap();
*over = true;
guard.0.notify_all();

// NB: xarg pointer freed after
}

let over = Arc::new(AtomicBool::new(false));
let panic_info = Arc::new(Mutex::new(None));
let guard = Arc::new((Condvar::new(), Mutex::new(false)));

let cbox: Box<(Box<dyn FnOnce() + Send + 'static>, Arc<AtomicBool>)> =
Box::new((Box::new(start), over.clone()));
let cbox: Box<(
Box<dyn FnOnce() + Send + 'static>,
ThreadPanicInfo,
ThreadGuard,
)> = Box::new((Box::new(start), panic_info.clone(), guard.clone()));
let xarg = Box::into_raw(cbox);

unsafe {
// create a thread here
let inner = api::terminate_thread_create(Some(trampoile), xarg as _);

let guard = Mutex::new(());
Self(Arc::new(ThreadInner { inner, over, guard }))
Self(Arc::new(ThreadInner {
inner,
guard,
panic_info,
}))
}
}

/// The thread guard
fn guard(&self) -> ThreadGuard {
self.0.guard.clone()
}

/// Stop The Spawned Thread
pub fn terminate(&self) {
let _guard = self.0.guard.lock().unwrap();
let over = self.0.guard.1.lock().unwrap();

if self.0.over.load(Relaxed) {
if *over {
// Call is already over
return;
}
Expand All @@ -72,18 +96,45 @@ impl Thread {
api::terminate_thread_terminate(self.0.inner);
}
}

/// The call finishes
pub fn over(&self) -> bool {
*self.guard().1.lock().unwrap()
}

/// The call panics
pub fn panics(&self) -> bool {
// if panic occurs
self.0.panic_info.lock().unwrap().is_some()
}

/// Get the thread panic info
pub fn panic_info(&self) -> ThreadPanicInfo {
self.0.panic_info.clone()
}

/// Wait the call finishes
pub fn join(&self) -> Self {
let (cdv, over_mtx) = &*self.0.guard;
let mut over = over_mtx.lock().unwrap();
while !*over {
over = cdv.wait(over).unwrap();
}

self.clone()
}
}

impl Drop for Thread {
impl Drop for ThreadInner {
fn drop(&mut self) {
unsafe {
if !self.0.over.load(Relaxed) {
if !*self.guard.1.lock().unwrap() {
// Call is not over, terminate it
api::terminate_thread_terminate(self.0.inner);
api::terminate_thread_terminate(self.inner);
return;
}

api::terminate_thread_drop(self.0.inner);
api::terminate_thread_drop(self.inner);
}
}
}
Expand Down

0 comments on commit cbb4666

Please sign in to comment.