Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions erts/emulator/beam/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ do { \
EQUE_DEF_QUEUE(q), /* back */ \
1, /* possibly_empty */ \
EQUE_DEF_QUEUE(q) + DEF_EQUEUE_SIZE, /* end */ \
EQUE_DEF_QUEUE(q), /* default */ \
ERTS_ALC_T_ESTACK /* alloc_type */ \
}

Expand All @@ -1024,6 +1025,7 @@ do { \
info->queue_start, /* back */ \
1, /* possibly_empty */ \
info->queue_end, /* end */ \
info->queue_default, /* default */ \
info->queue_alloc_type /* alloc_type */ \
}

Expand Down
120 changes: 58 additions & 62 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ static erts_atomic32_t function_calls;
static erts_atomic32_t doing_sys_schedule;
#endif
static erts_atomic32_t no_empty_run_queues;
static erts_atomic32_t no_waiting_scheds;
long erts_runq_supervision_interval = 0;
static ethr_event runq_supervision_event;
static erts_tid_t runq_supervisor_tid;
Expand Down Expand Up @@ -2934,6 +2935,7 @@ sched_waiting(Uint no, ErtsRunQueue *rq)
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
(void) ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK
| ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
erts_atomic32_inc_nob(&no_waiting_scheds);
rq->waiting++;
rq->woken = 0;
if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
Expand All @@ -2944,6 +2946,7 @@ static ERTS_INLINE void
sched_active(Uint no, ErtsRunQueue *rq)
{
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
erts_atomic32_dec_nob(&no_waiting_scheds);
rq->waiting--;
if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_active);
Expand Down Expand Up @@ -4617,24 +4620,19 @@ try_steal_task_from_victim(ErtsRunQueue *rq, ErtsRunQueue *vrq, Uint32 flags, Pr
/* Expects rq to be unlocked
rq is locked on return iff the return value is non-zero */
static ERTS_INLINE int
check_possible_steal_victim(ErtsRunQueue *rq, int vix, Process **result_proc, ErtsWStack* contended_runqueues)
check_possible_steal_victim(ErtsRunQueue *rq, int vix, Process **result_proc, ErtsEQueue* contended_runqueues)
{
ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix);
Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq);

if (!runq_got_work_to_execute_flags(flags))
return 0;

if (contended_runqueues) {
if (erts_mtx_trylock(&vrq->mtx) == EBUSY) {
WSTACK_PUSH((*contended_runqueues), vix);
return 0;
}
goto lock_taken;
if (erts_mtx_trylock(&vrq->mtx) == EBUSY) {
EQUEUE_PUT((*contended_runqueues), ((Eterm) vix));
return 0;
}

erts_mtx_lock(&vrq->mtx);
lock_taken:
return try_steal_task_from_victim(rq, vrq, flags, result_proc);
}

Expand All @@ -4643,74 +4641,75 @@ try_steal_task(ErtsRunQueue *rq, Process **result_proc)
{
int res, vix, active_rqs, blnc_rqs;
Uint32 flags;
DECLARE_WSTACK(contended_runqueues);
DECLARE_EQUEUE(contended_runqueues);

flags = empty_runq_get_old_flags(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0; /* go suspend instead... */
return 0; /* go suspend instead... */

ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
erts_runq_unlock(rq);

get_no_runqs(&active_rqs, &blnc_rqs);

if (active_rqs > blnc_rqs)
active_rqs = blnc_rqs;
active_rqs = blnc_rqs;

if (erts_atomic32_read_acqb(&no_empty_run_queues) >= blnc_rqs)
goto end_try_steal_task;

if (rq->ix < active_rqs) {
/* First try to steal from an inactive run queue... */
if (active_rqs < blnc_rqs) {
int no = blnc_rqs - active_rqs;
int stop_ix = vix = active_rqs + rq->ix % no;
while (1) {
res = check_possible_steal_victim(rq, vix, result_proc, &contended_runqueues);
if (res) {
DESTROY_WSTACK(contended_runqueues);
return res;
}
vix++;
if (vix >= blnc_rqs)
vix = active_rqs;
if (vix == stop_ix)
break;
}
}

vix = rq->ix;

/* ... then try to steal a job from another active queue... */
while (1) {
vix++;
if (vix >= active_rqs)
vix = 0;
if (vix == rq->ix)
break;

res = check_possible_steal_victim(rq, vix, result_proc, &contended_runqueues);
if (res) {
DESTROY_WSTACK(contended_runqueues);
return res;
}
}
if (rq->ix >= active_rqs)
goto end_try_steal_task;

/* ... and finally re-try stealing from the queues that were skipped because contended.
We recheck the number of empty runqueues in each iteration, as taking the runqueue lock in check_possible_steal_victim can take quite a while. */
while (!WSTACK_ISEMPTY(contended_runqueues)
&& (erts_atomic32_read_acqb(&no_empty_run_queues) < blnc_rqs)) {
vix = WSTACK_POP(contended_runqueues);
res = check_possible_steal_victim(rq, vix, result_proc, NULL);
/* First try to steal from an inactive run queue... */
if (active_rqs < blnc_rqs) {
int no = blnc_rqs - active_rqs;
int stop_ix = vix = active_rqs + rq->ix % no;
while (1) {
res = check_possible_steal_victim(rq, vix, result_proc, &contended_runqueues);
if (res) {
DESTROY_WSTACK(contended_runqueues);
DESTROY_EQUEUE(contended_runqueues);
return res;
}
vix++;
if (vix >= blnc_rqs)
vix = active_rqs;
if (vix == stop_ix)
break;
}
}

vix = rq->ix;

/* ... then try to steal a job from another active queue... */
while (1) {
vix++;
if (vix >= active_rqs)
vix = 0;
if (vix == rq->ix)
break;

res = check_possible_steal_victim(rq, vix, result_proc, &contended_runqueues);
if (res) {
DESTROY_EQUEUE(contended_runqueues);
return res;
}
}

/* ... and finally re-try stealing from the queues that were skipped because contended.
We recheck the number of empty runqueues in each iteration, as taking the runqueue lock in check_possible_steal_victim can take quite a while. */
while (!EQUEUE_ISEMPTY(contended_runqueues)
&& (erts_atomic32_read_acqb(&no_empty_run_queues) < blnc_rqs)) {
vix = (int) EQUEUE_GET(contended_runqueues);
res = check_possible_steal_victim(rq, vix, result_proc, &contended_runqueues);
if (res) {
DESTROY_EQUEUE(contended_runqueues);
return res;
}
}

end_try_steal_task:
DESTROY_WSTACK(contended_runqueues);
DESTROY_EQUEUE(contended_runqueues);
erts_runq_lock(rq);
return runq_got_work_to_execute(rq);
}
Expand Down Expand Up @@ -5630,11 +5629,9 @@ wakeup_other_check(ErtsRunQueue *rq, Uint32 flags)
if (rq->waiting) {
wake_dirty_scheduler(rq);
}
} else
{
int empty_rqs =
erts_atomic32_read_acqb(&no_empty_run_queues);
if (empty_rqs != 0)
}
else {
if (erts_atomic32_read_nob(&no_waiting_scheds))
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
}
Expand Down Expand Up @@ -6132,6 +6129,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online, int no_poll_th
erts_atomic32_init_nob(&function_calls, 0);
#endif
erts_atomic32_init_nob(&no_empty_run_queues, 0);
erts_atomic32_init_nob(&no_waiting_scheds, 0);

erts_no_run_queues = n;

Expand Down Expand Up @@ -9826,7 +9824,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
continue_check_activities_to_run:
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
continue_check_activities_to_run_known_flags:
ASSERT(!is_normal_sched || (flags & ERTS_RUNQ_FLG_NONEMPTY));

if (!is_normal_sched) {
if (erts_atomic32_read_acqb(&esdp->ssi->flags)
Expand Down Expand Up @@ -9908,7 +9905,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
p = NULL;
if (try_steal_task(rq, &p)) {
if (p) {
non_empty_runq(rq);
state = erts_atomic32_read_acqb(&p->state);
goto execute_process;
}
Expand Down
6 changes: 4 additions & 2 deletions erts/emulator/beam/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ typedef struct {
Eterm* back;
int possibly_empty;
Eterm* end;
Eterm* default_equeue;
ErtsAlcType_t alloc_type;
} ErtsEQueue;

Expand All @@ -848,12 +849,13 @@ void erl_grow_equeue(ErtsEQueue*, Eterm* def_queue);
EQUE_DEF_QUEUE(q), /* back */ \
1, /* possibly_empty */ \
EQUE_DEF_QUEUE(q) + DEF_EQUEUE_SIZE, /* end */ \
EQUE_DEF_QUEUE(q), /* default_equeue */ \
ERTS_ALC_T_ESTACK /* alloc_type */ \
}

#define DESTROY_EQUEUE(q) \
do { \
if (q.start != EQUE_DEF_QUEUE(q)) { \
if (q.start != q.default_equeue) { \
erts_free(q.alloc_type, q.start); \
} \
} while(0)
Expand All @@ -870,7 +872,7 @@ do { \
#define EQUEUE_PUT(q, x) \
do { \
if (q.back == q.front && !q.possibly_empty) { \
erl_grow_equeue(&q, EQUE_DEF_QUEUE(q)); \
erl_grow_equeue(&q, q.default_equeue); \
} \
EQUEUE_PUT_UNCHECKED(q, x); \
} while(0)
Expand Down
Loading