@@ -31,6 +31,7 @@ var LibraryPThread = {
3131 $PThread__deps : [ '_emscripten_thread_init' ,
3232 '$killThread' ,
3333 '$cancelThread' , '$cleanupThread' , '$zeroMemory' ,
34+ '$markAsZombie' ,
3435 '$spawnThread' ,
3536 '_emscripten_thread_free_data' ,
3637 'exit' ,
@@ -55,6 +56,7 @@ var LibraryPThread = {
5556 // the reverse mapping, each worker has a `pthread_ptr` when its running a
5657 // pthread.
5758 pthreads : { } ,
59+ zombieThreads : { } ,
5860#if PTHREADS_DEBUG
5961 nextWorkerID : 1 ,
6062 debugInit : function ( ) {
@@ -267,6 +269,8 @@ var LibraryPThread = {
267269 spawnThread ( d ) ;
268270 } else if ( cmd === 'cleanupThread' ) {
269271 cleanupThread ( d [ 'thread' ] ) ;
272+ } else if ( cmd === 'markAsZombie' ) {
273+ markAsZombie ( d [ 'thread' ] ) ;
270274 } else if ( cmd === 'killThread' ) {
271275 killThread ( d [ 'thread' ] ) ;
272276 } else if ( cmd === 'cancelThread' ) {
@@ -527,6 +531,18 @@ var LibraryPThread = {
527531 worker . pthread_ptr = 0 ;
528532 } ,
529533
534+ __emscripten_thread_exit_joinable : function ( thread ) {
535+ // Called when a thread exits and is joinable. This puts the thread
536+ // into zombie state where it can't run anymore work but cannot yet
537+ // be cleaned up.
538+ if ( ! ENVIRONMENT_IS_PTHREAD ) markAsZombie ( thread ) ;
539+ else postMessage ( { 'cmd' : 'markAsZombie' , 'thread' : thread } ) ;
540+ } ,
541+
542+ $markAsZombie : function ( pthread_ptr ) {
543+ PThread . zombieThreads [ pthread_ptr ] = true ;
544+ } ,
545+
530546 __emscripten_thread_cleanup : function ( thread ) {
531547 // Called when a thread needs to be cleaned up so it can be reused.
532548 // A thread is considered reusable when it either returns from its
@@ -543,6 +559,7 @@ var LibraryPThread = {
543559 assert ( pthread_ptr , 'Internal Error! Null pthread_ptr in cleanupThread!' ) ;
544560#endif
545561 var worker = PThread . pthreads [ pthread_ptr ] ;
562+ delete PThread . zombieThreads [ pthread_ptr ] ;
546563 assert ( worker ) ;
547564 PThread . returnWorkerToPool ( worker ) ;
548565 } ,
@@ -1069,6 +1086,97 @@ var LibraryPThread = {
10691086#endif
10701087 } ,
10711088
1089+ #if MAIN_MODULE
1090+ $promiseMap : "new Map ( ) ; ",
1091+ $nextPromiseId : 1 ,
1092+
1093+ // Create a new promise that can be resolved or rejected by passing
1094+ // a unique ID to emscripten_promise_resolve/emscripten_promise_reject
1095+ // TODO(sbc): Should we factor this out and make this API public?
1096+ $newNativePromise__deps : [ '$promiseMap' , '$nextPromiseId' ] ,
1097+ $newNativePromise : function ( func , args ) {
1098+ return new Promise ( ( resolve , reject ) => {
1099+ var promiseId = nextPromiseId ;
1100+ nextPromiseId += 1 ;
1101+ #if RUNTIME_DEBUG
1102+ dbg ( 'newNativePromise: ' + promiseId ) ;
1103+ #endif
1104+ promiseMap . set ( promiseId , { resolve, reject} ) ;
1105+ // Native promise function take promise ID as last argument
1106+ args . push ( promiseId ) ;
1107+ func . apply ( null , args ) ;
1108+ } ) ;
1109+ } ,
1110+
1111+ _emscripten_promise_resolve__deps : [ '$promiseMap' ] ,
1112+ _emscripten_promise_resolve__sig : 'vip' ,
1113+ _emscripten_promise_resolve : function ( id , value ) {
1114+ #if RUNTIME_DEBUG
1115+ err ( 'emscripten_resolve_promise: ' + id ) ;
1116+ #endif
1117+ assert ( promiseMap . has ( id ) ) ;
1118+ promiseMap . get ( id ) . resolve ( value ) ;
1119+ promiseMap . delete ( id ) ;
1120+ } ,
1121+
1122+ _emscripten_promise_reject__deps : [ '$promiseMap' ] ,
1123+ _emscripten_promise_reject__sig : 'vip' ,
1124+ _emscripten_promise_reject : function ( id ) {
1125+ #if RUNTIME_DEBUG
1126+ dbg ( 'emscripten_promise_reject: ' + id ) ;
1127+ #endif
1128+ assert ( promiseMap . has ( id ) ) ;
1129+ promiseMap . get ( id ) . reject ( ) ;
1130+ promiseMap . delete ( id ) ;
1131+ } ,
1132+
1133+ // Called on the main thread to synchronize the code loaded on all threads.
1134+ // This work happens asynchronously. The `callback` is called once this work
1135+ // is completed, passing the ctx.
1136+ // TODO(sbc): Should we make a new form of __proxy attribute for JS library
1137+ // function that run asynchronously like but blocks the caller until they are
1138+ // done. Perhaps "sync_with_ctx"?
1139+ _emscripten_sync_all_threads__sig : 'viii' ,
1140+ _emscripten_sync_all_threads__deps : [ '_emscripten_proxy_sync_code' , '$newNativePromise' ] ,
1141+ _emscripten_sync_all_threads : function ( caller , callback , ctx ) {
1142+ #if PTHREADS_DEBUG
1143+ dbg ( "_emscripten_sync_all_threads caller=" + ptrToString ( caller ) ) ;
1144+ #endif
1145+ #if ASSERTIONS
1146+ assert ( ! ENVIRONMENT_IS_PTHREAD , 'Internal Error! _emscripten_sync_all_threads() can only ever be called from main thread' ) ;
1147+ #endif
1148+
1149+ let promises = [ ] ;
1150+
1151+ // This first promise resolves once the main thread has loaded all modules.
1152+ promises . push ( newNativePromise ( __emscripten_thread_sync_code_async , [ ] ) ) ;
1153+
1154+ // We then create a sequence of promises, one per thread, that resolve once
1155+ // each thread has performed its sync using _emscripten_proxy_sync_code.
1156+ // Any new threads that are created after this call will automaticaly be
1157+ // in sync because we call `__emscripten_thread_sync_code` in
1158+ // invokeEntryPoint before the threads entry point is called.
1159+ for ( const ptr of Object . keys ( PThread . pthreads ) ) {
1160+ const pthread_ptr = Number ( ptr ) ;
1161+ if ( pthread_ptr !== caller && ! ( pthread_ptr in PThread . zombieThreads ) ) {
1162+ promises . push ( newNativePromise ( __emscripten_proxy_sync_code , [ pthread_ptr ] ) ) ;
1163+ }
1164+ }
1165+
1166+ #if PTHREADS_DEBUG
1167+ dbg ( "_emscripten_sync_all_threads: waiting on " + promises . length + " promises" ) ;
1168+ #endif
1169+ // Once all promises are resolved then we know all threads are in sync and
1170+ // we can call the callback.
1171+ Promise . all ( promises ) . then ( ( ) => {
1172+ #if PTHREADS_DEBUG
1173+ dbg ( "_emscripten_sync_all_threads done: calling callback" ) ;
1174+ #endif
1175+ { { { makeDynCall ( 'vp' , 'callback' ) } } } ( ctx ) ;
1176+ } ) ;
1177+ } ,
1178+ #endif
1179+
10721180 $executeNotifiedProxyingQueue : function ( queue ) {
10731181 // Set the notification state to processing.
10741182 Atomics . store ( HEAP32 , queue >> 2 , { { { cDefine ( 'NOTIFICATION_RECEIVED' ) } } } ) ;
0 commit comments