@@ -53,7 +53,6 @@ use crate::{ffi, FromPyPointer, IntoPy, Py, PyObject, PyTypeCheck, PyTypeInfo};
5353use std:: ffi:: { CStr , CString } ;
5454use std:: marker:: PhantomData ;
5555use std:: os:: raw:: c_int;
56- use std:: thread;
5756
5857/// A marker token that represents holding the GIL.
5958///
@@ -316,16 +315,102 @@ impl<'py> Python<'py> {
316315 F : Send + FnOnce ( ) -> T ,
317316 T : Send ,
318317 {
318+ use std:: mem:: transmute;
319+ use std:: panic:: { catch_unwind, resume_unwind, AssertUnwindSafe } ;
320+ use std:: sync:: mpsc:: { sync_channel, SendError , SyncSender } ;
321+ use std:: thread:: { spawn, Result } ;
322+ use std:: time:: Duration ;
323+
324+ use parking_lot:: { const_mutex, Mutex } ;
325+
326+ use crate :: impl_:: panic:: PanicTrap ;
327+
319328 // Use a guard pattern to handle reacquiring the GIL,
320329 // so that the GIL will be reacquired even if `f` panics.
321330 // The `Send` bound on the closure prevents the user from
322331 // transferring the `Python` token into the closure.
323332 let _guard = unsafe { SuspendGIL :: new ( ) } ;
324333
325334 // To close soundness loopholes w.r.t. `send_wrapper` or `scoped-tls`,
326- // we run the closure on a newly created thread so that it cannot
335+ // we run the closure on a separate thread so that it cannot
327336 // access thread-local storage from the current thread.
328- thread:: scope ( |s| s. spawn ( f) . join ( ) . unwrap ( ) )
337+
338+ // 1. Construct a task
339+ struct Task ( * mut dyn FnMut ( ) ) ;
340+ unsafe impl Send for Task { }
341+
342+ let ( result_sender, result_receiver) = sync_channel :: < Result < T > > ( 0 ) ;
343+
344+ let mut f = Some ( f) ;
345+
346+ let mut task = || {
347+ let f = f. take ( ) . unwrap ( ) ;
348+
349+ let result = catch_unwind ( AssertUnwindSafe ( f) ) ;
350+
351+ result_sender. send ( result) . unwrap ( ) ;
352+ } ;
353+
354+ // SAFETY: the current thread will block until the closure has returned
355+ let task = Task ( unsafe { transmute ( & mut task as & mut dyn FnMut ( ) ) } ) ;
356+
357+ // 2. Enqueue task and spawn thread if necessary
358+ let trap = PanicTrap :: new ( "allow_threads unwound while stack data was potentially accessed by another thread which is a bug" ) ;
359+
360+ static THREADS : Mutex < Vec < SyncSender < Task > > > = const_mutex ( Vec :: new ( ) ) ;
361+
362+ enum State {
363+ Pending ( Task ) ,
364+ Dispatched ( SyncSender < Task > ) ,
365+ }
366+
367+ let mut state = State :: Pending ( task) ;
368+
369+ while let Some ( task_sender) = THREADS . lock ( ) . pop ( ) {
370+ match state {
371+ State :: Pending ( task) => match task_sender. send ( task) {
372+ Ok ( ( ) ) => {
373+ state = State :: Dispatched ( task_sender) ;
374+ break ;
375+ }
376+ Err ( SendError ( task) ) => {
377+ state = State :: Pending ( task) ;
378+ continue ;
379+ }
380+ } ,
381+ State :: Dispatched ( _sender) => unreachable ! ( ) ,
382+ }
383+ }
384+
385+ let task_sender = match state {
386+ State :: Pending ( task) => {
387+ let ( task_sender, task_receiver) = sync_channel :: < Task > ( 0 ) ;
388+
389+ spawn ( move || {
390+ while let Ok ( task) = task_receiver. recv_timeout ( Duration :: from_secs ( 60 ) ) {
391+ // SAFETY: all data accessed by `task` will stay alive until it completes
392+ unsafe { ( * task. 0 ) ( ) } ;
393+ }
394+ } ) ;
395+
396+ task_sender. send ( task) . unwrap ( ) ;
397+
398+ task_sender
399+ }
400+ State :: Dispatched ( task_sender) => task_sender,
401+ } ;
402+
403+ // 3. Wait for completion and check result
404+ let result = result_receiver. recv ( ) . unwrap ( ) ;
405+
406+ trap. disarm ( ) ;
407+
408+ THREADS . lock ( ) . push ( task_sender) ;
409+
410+ match result {
411+ Ok ( result) => result,
412+ Err ( payload) => resume_unwind ( payload) ,
413+ }
329414 }
330415
331416 /// Evaluates a Python expression in the given context and returns the result.
0 commit comments