Skip to content

Commit 9dc6f07

Browse files
committed
Block in dlopen until all threads have loaded the module
Fixes: #18345
1 parent abb289b commit 9dc6f07

File tree

12 files changed

+281
-104
lines changed

12 files changed

+281
-104
lines changed

emcc.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1634,7 +1634,12 @@ def setup_pthreads(target):
16341634
]
16351635

16361636
if settings.MAIN_MODULE:
1637-
settings.REQUIRED_EXPORTS += ['_emscripten_thread_sync_code', '__dl_seterr']
1637+
settings.REQUIRED_EXPORTS += [
1638+
'_emscripten_thread_sync_code',
1639+
'_emscripten_thread_sync_code_async',
1640+
'_emscripten_proxy_sync_code',
1641+
'__dl_seterr',
1642+
]
16381643

16391644
settings.DEFAULT_LIBRARY_FUNCS_TO_INCLUDE += [
16401645
'$exitOnMainThread',

src/library_pthread.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,83 @@ var LibraryPThread = {
10691069
#endif
10701070
},
10711071

1072+
#if MAIN_MODULE
1073+
$promiseMap: "new Map();",
1074+
$nextPromiseId: 1,
1075+
1076+
// Create a new promise that can be resolved or rejected by passing
1077+
// a unique ID to emscripten_promise_resolve/emscripten_promise_reject
1078+
$newNativePromise__deps: ['$promiseMap', '$nextPromiseId'],
1079+
$newNativePromise: function(func, args) {
1080+
return new Promise((resolve, reject) => {
1081+
var promiseId = nextPromiseId;
1082+
nextPromiseId += 1;
1083+
promiseMap.set(promiseId, {resolve, reject});
1084+
// Native promise function take promise ID as last argument
1085+
args.push(promiseId);
1086+
func.apply(null, args);
1087+
});
1088+
},
1089+
1090+
emscripten_promise_resolve__deps: ['$promiseMap'],
1091+
emscripten_promise_resolve__sig: 'vip',
1092+
emscripten_promise_resolve: function(id, value) {
1093+
#if RUNTIME_DEBUG
1094+
err('emscripten_resolve_promise: ' + id);
1095+
#endif
1096+
assert(promiseMap.has(id));
1097+
promiseMap.get(id).resolve(value);
1098+
promiseMap.delete(id);
1099+
},
1100+
1101+
emscripten_promise_reject__deps: ['$promiseMap'],
1102+
emscripten_promise_reject__sig: 'vip',
1103+
emscripten_promise_reject: function(id) {
1104+
#if RUNTIME_DEBUG
1105+
dbg('emscripten_promise_reject: ' + id);
1106+
#endif
1107+
assert(promiseMap.has(id));
1108+
promiseMap.get(id).reject();
1109+
},
1110+
1111+
// Called on the main thread to syncronize the code loaded on all threads.
1112+
// This work happens asyncronously. The `callback` is called once this work
1113+
// is completely, passing the ctx.
1114+
_emscripten_sync_all_threads__sig: 'viii',
1115+
_emscripten_sync_all_threads__deps: ['_emscripten_proxy_sync_code', '$newNativePromise'],
1116+
_emscripten_sync_all_threads: function(caller, callback, ctx) {
1117+
#if PTHREADS_DEBUG
1118+
dbg("_emscripten_sync_all_threads caller=" + ptrToString(caller));
1119+
#endif
1120+
#if ASSERTIONS
1121+
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! _emscripten_sync_all_threads() can only ever be called from main thread');
1122+
#endif
1123+
1124+
let promises = [];
1125+
1126+
// This first promise resolves once the main thread has loaded all module
1127+
promises.push(newNativePromise(__emscripten_thread_sync_code_async, [0]));
1128+
1129+
// We then create a sequence of promises, one per thread, that resolve once
1130+
// each thread has performed its sync using _emscripten_proxy_sync_code.
1131+
for (const ptr of Object.keys(PThread.pthreads)) {
1132+
const pthread_ptr = Number(ptr);
1133+
if (pthread_ptr !== caller) {
1134+
promises.push(newNativePromise(__emscripten_proxy_sync_code, [pthread_ptr]));
1135+
}
1136+
}
1137+
1138+
// Once all promises are resolved then we we know all threads are in
1139+
// sync and we can call the callback.
1140+
Promise.all(promises).then(() => {
1141+
#if PTHREADS_DEBUG
1142+
dbg("_emscripten_sync_all_threads done: calling callback");
1143+
#endif
1144+
{{{ makeDynCall('vp', 'callback') }}}(ctx);
1145+
});
1146+
},
1147+
#endif
1148+
10721149
$executeNotifiedProxyingQueue: function(queue) {
10731150
// Set the notification state to processing.
10741151
Atomics.store(HEAP32, queue >> 2, {{{ cDefine('NOTIFICATION_RECEIVED') }}});

system/include/emscripten/threading.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,6 @@ int emscripten_pthread_attr_settransferredcanvases(pthread_attr_t *a, const char
286286
// blocking is not enabled, see ALLOW_BLOCKING_ON_MAIN_THREAD.
287287
void emscripten_check_blocking_allowed(void);
288288

289-
// Experimental API for syncing loaded code between pthreads.
290-
void _emscripten_thread_sync_code();
291-
292289
void _emscripten_yield();
293290

294291
#ifdef __cplusplus

system/lib/libc/dynlink.c

Lines changed: 161 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <dynlink.h>
2222

2323
#include <emscripten/console.h>
24+
#include <emscripten/threading.h>
25+
#include <emscripten/proxying.h>
2426

2527
//#define DYLINK_DEBUG
2628

@@ -29,15 +31,15 @@ struct async_data {
2931
em_arg_callback_func onerror;
3032
void* user_data;
3133
};
32-
typedef void (*dlopen_callback_func)(struct dso*, struct async_data* user_data);
34+
typedef void (*dlopen_callback_func)(struct dso*, void* user_data);
3335

3436
void _dlinit(struct dso* main_dso_handle);
3537
void* _dlopen_js(struct dso* handle);
3638
void* _dlsym_js(struct dso* handle, const char* symbol);
3739
void _emscripten_dlopen_js(struct dso* handle,
3840
dlopen_callback_func onsuccess,
3941
dlopen_callback_func onerror,
40-
struct async_data* user_data);
42+
void* user_data);
4143
void __dl_vseterr(const char*, va_list);
4244

4345
static struct dso * _Atomic head, * _Atomic tail;
@@ -105,7 +107,8 @@ static struct dso* load_library_start(const char* name, int flags) {
105107
return p;
106108
}
107109

108-
static void dlopen_js_onsuccess(struct dso* dso, struct async_data* data) {
110+
static void dlopen_onsuccess(struct dso* dso, void* user_data) {
111+
struct async_data* data = (struct async_data*)user_data;
109112
#ifdef DYLINK_DEBUG
110113
_emscripten_errf("%p: dlopen_js_onsuccess: dso=%p mem_addr=%p mem_size=%zu",
111114
pthread_self(),
@@ -119,7 +122,8 @@ static void dlopen_js_onsuccess(struct dso* dso, struct async_data* data) {
119122
free(data);
120123
}
121124

122-
static void dlopen_js_onerror(struct dso* dso, struct async_data* data) {
125+
static void dlopen_onerror(struct dso* dso, void* user_data) {
126+
struct async_data* data = (struct async_data*)user_data;
123127
#ifdef DYLINK_DEBUG
124128
_emscripten_errf("%p: dlopen_js_onerror: dso=%p", pthread_self(), dso);
125129
#endif
@@ -148,6 +152,153 @@ static void ensure_init() {
148152
pthread_rwlock_unlock(&lock);
149153
}
150154

155+
#ifdef _REENTRANT
156+
// These functions are defined in JS.
157+
158+
// Signal the completion of a given promise that was passed into
159+
// emscripten_proxy_sync_code.
160+
void emscripten_promise_resolve(int promise_id, void* value);
161+
void emscripten_promise_reject(int promise_id);
162+
163+
void _emscripten_thread_sync_code_async(struct dso* dso, int promise_id);
164+
165+
static void sync_one_onsuccess(struct dso* dso, void* user_data) {
166+
int promise_id = (intptr_t)user_data;
167+
// Load the next dso in the list
168+
thread_local_tail = dso;
169+
_emscripten_thread_sync_code_async(dso->next, promise_id);
170+
}
171+
172+
static void sync_one_onerror(struct dso* dso, void* user_data) {
173+
int promise_id = (intptr_t)user_data;
174+
pthread_rwlock_unlock(&lock);
175+
emscripten_promise_reject(promise_id);
176+
}
177+
178+
void _emscripten_thread_sync_code_async(struct dso* dso, int promise_id) {
179+
_emscripten_errf("_emscripten_thread_sync_code_async %p promise_id=%d", dso, promise_id);
180+
ensure_init();
181+
pthread_rwlock_rdlock(&lock);
182+
if (!thread_local_tail) {
183+
thread_local_tail = head;
184+
}
185+
if (!dso) {
186+
dso = thread_local_tail->next;
187+
}
188+
if (!dso) {
189+
pthread_rwlock_unlock(&lock);
190+
emscripten_promise_resolve(promise_id, NULL);
191+
return;
192+
}
193+
194+
// Unlock happens once all DSO have been loaded, or one of them fails
195+
// with sync_one_onerror.
196+
_emscripten_dlopen_js(
197+
dso, sync_one_onsuccess, sync_one_onerror, (void*)promise_id);
198+
}
199+
200+
void _emscripten_thread_sync_code() {
201+
// Should only ever be called from a background thread.
202+
assert(!emscripten_is_main_runtime_thread());
203+
ensure_init();
204+
if (thread_local_tail == tail) {
205+
#ifdef DYLINK_DEBUG
206+
_emscripten_errf("%p: emscripten_thread_sync_code: already in sync", pthread_self());
207+
#endif
208+
return;
209+
}
210+
pthread_rwlock_rdlock(&lock);
211+
if (!thread_local_tail) {
212+
thread_local_tail = head;
213+
}
214+
while (thread_local_tail->next) {
215+
struct dso* p = thread_local_tail->next;
216+
#ifdef DYLINK_DEBUG
217+
fprintf(stderr,
218+
"%p: emscripten_thread_sync_code: %s mem_addr=%p mem_size=%zu "
219+
"table_addr=%p table_size=%d",
220+
pthread_self(),
221+
p->name,
222+
p->mem_addr,
223+
p->mem_size,
224+
p->table_addr,
225+
p->table_size);
226+
#endif
227+
void* success = _dlopen_js(p);
228+
if (!success) {
229+
// TODO(sbc): Do a better job of handling errors here.
230+
_emscripten_errf("emscripten_thread_sync_code failed: %s", dlerror());
231+
abort();
232+
}
233+
thread_local_tail = p;
234+
}
235+
pthread_rwlock_unlock(&lock);
236+
#ifdef DYLINK_DEBUG
237+
_emscripten_errf("%p: emscripten_thread_sync_code done", pthread_self());
238+
#endif
239+
}
240+
241+
static void do_thread_sync(void* arg) {
242+
int promise_id = (intptr_t)arg;
243+
#ifdef DYLINK_DEBUG
244+
_emscripten_errf("%p: do_thread_sync: %d", pthread_self(), promise_id);
245+
#endif
246+
_emscripten_thread_sync_code();
247+
}
248+
249+
// Called once _emscripten_proxy_sync_code completes
250+
static void done_thread_sync(void* arg) {
251+
int promise_id = (intptr_t)arg;
252+
#ifdef DYLINK_DEBUG
253+
_emscripten_errf("%p: done_thread_sync: promise_id=%d", pthread_self(), promise_id);
254+
#endif
255+
emscripten_promise_resolve(promise_id, NULL);
256+
}
257+
258+
int _emscripten_proxy_sync_code(pthread_t target_thread, int promise_id) {
259+
em_proxying_queue* q = emscripten_proxy_get_system_queue();
260+
return emscripten_proxy_async_with_callback(
261+
q, target_thread, do_thread_sync, NULL, done_thread_sync, (void*)promise_id);
262+
}
263+
264+
static void done_sync_all(em_proxying_ctx* ctx) {
265+
#ifdef DYLINK_DEBUG
266+
_emscripten_errf("%p: done_sync_all", pthread_self());
267+
#endif
268+
emscripten_proxy_finish(ctx);
269+
}
270+
271+
// This function is defined in JS. It is an async function that runs
272+
// emscripten_thread_sync_code of each of threads that are running at the
273+
// time of the call. Once this is done the callback is called with the given
274+
// em_proxying_ctx.
275+
void _emscripten_sync_all_threads(pthread_t calling_thread,
276+
void (*callback)(em_proxying_ctx*),
277+
em_proxying_ctx* ctx);
278+
279+
static void main_thread_sync_all(em_proxying_ctx* ctx, void* arg) {
280+
pthread_t calling_thread = (pthread_t)arg;
281+
#ifdef DYLINK_DEBUG
282+
_emscripten_errf("%p: main_thread_sync_all calling=%p", pthread_self(), calling_thread);
283+
#endif
284+
_emscripten_sync_all_threads(calling_thread, done_sync_all, ctx);
285+
}
286+
287+
static void sync_code_all_threads() {
288+
// Call `main_thread_sync_all` on the main thread and block until its
289+
// complete. This gets called after a shared library is loaded by a worker.
290+
pthread_t main_thread = emscripten_main_browser_thread_id();
291+
#ifdef DYLINK_DEBUG
292+
_emscripten_errf("%p: sync_code_all_threads main=%p", pthread_self(), main_thread);
293+
#endif
294+
assert(pthread_self() != main_thread);
295+
em_proxying_queue* q = emscripten_proxy_get_system_queue();
296+
int success = emscripten_proxy_sync_with_ctx(
297+
q, main_thread, main_thread_sync_all, pthread_self());
298+
assert(success);
299+
}
300+
#endif // _REENTRANT
301+
151302
void* dlopen(const char* file, int flags) {
152303
ensure_init();
153304
if (!file) {
@@ -191,6 +342,10 @@ void* dlopen(const char* file, int flags) {
191342
load_library_done(p);
192343
end:
193344
pthread_rwlock_unlock(&lock);
345+
#ifdef _REENTRANT
346+
// Block until all other threads have loaded this module.
347+
sync_code_all_threads();
348+
#endif
194349
pthread_setcancelstate(cs, 0);
195350
return p;
196351
}
@@ -219,8 +374,8 @@ void emscripten_dlopen(const char* filename, int flags, void* user_data,
219374
#ifdef DYLINK_DEBUG
220375
_emscripten_errf("%p: calling emscripten_dlopen_js %p", pthread_self(), p);
221376
#endif
222-
// Unlock happens in dlopen_js_onsuccess/dlopen_js_onerror
223-
_emscripten_dlopen_js(p, dlopen_js_onsuccess, dlopen_js_onerror, d);
377+
// Unlock happens in dlopen_onsuccess/dlopen_onerror
378+
_emscripten_dlopen_js(p, dlopen_onsuccess, dlopen_onerror, d);
224379
}
225380

226381
void* __dlsym(void* restrict p, const char* restrict s, void* restrict ra) {
@@ -246,57 +401,3 @@ int dladdr(const void* addr, Dl_info* info) {
246401
info->dli_saddr = NULL;
247402
return 1;
248403
}
249-
250-
#ifdef _REENTRANT
251-
void _emscripten_thread_sync_code() {
252-
// This function is called from emscripten_yeild which itself is called
253-
// whenever we block on a futex. We need to check to avoid infinite
254-
// recursion when taking the lock below.
255-
static thread_local bool syncing = false;
256-
if (syncing) {
257-
return;
258-
}
259-
syncing = true;
260-
ensure_init();
261-
if (thread_local_tail == tail) {
262-
#ifdef DYLINK_DEBUG
263-
_emscripten_errf("%p: emscripten_thread_sync_code: already in sync", pthread_self());
264-
#endif
265-
goto done;
266-
}
267-
pthread_rwlock_rdlock(&lock);
268-
if (!thread_local_tail) {
269-
thread_local_tail = head;
270-
}
271-
while (thread_local_tail->next) {
272-
struct dso* p = thread_local_tail->next;
273-
#ifdef DYLINK_DEBUG
274-
_emscripten_errf("%p: emscripten_thread_sync_code: %s mem_addr=%p "
275-
"mem_size=%zu table_addr=%p table_size=%zu",
276-
pthread_self(),
277-
p->name,
278-
p->mem_addr,
279-
p->mem_size,
280-
p->table_addr,
281-
p->table_size);
282-
#endif
283-
void* success = _dlopen_js(p);
284-
if (!success) {
285-
// If any on the libraries fails to load here then we give up.
286-
// TODO(sbc): Ideally this would never happen and we could/should
287-
// abort, but on the main thread (where we don't have sync xhr) its
288-
// often not possible to syncronously load side module.
289-
_emscripten_errf("emscripten_thread_sync_code failed: %s", dlerror());
290-
break;
291-
}
292-
thread_local_tail = p;
293-
}
294-
pthread_rwlock_unlock(&lock);
295-
#ifdef DYLINK_DEBUG
296-
_emscripten_errf("%p: emscripten_thread_sync_code done", pthread_self());
297-
#endif
298-
299-
done:
300-
syncing = false;
301-
}
302-
#endif

0 commit comments

Comments
 (0)