Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear candidates (suspects) in parallel: entanglement management perf improvement (and other fixes) #168

Merged
merged 5 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions basis-library/mlton/thread.sig
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ signature MLTON_THREAD =
structure HierarchicalHeap :
sig
type thread = Basic.t
type clear_set
type finished_clear_set_grain

(* The level (depth) of a thread's heap in the hierarchy. *)
val getDepth : thread -> int
Expand Down Expand Up @@ -69,6 +71,14 @@ signature MLTON_THREAD =
(* Move all chunks at the current depth up one level. *)
val promoteChunks : thread -> unit

val clearSuspectsAtDepth: thread * int -> unit
val numSuspectsAtDepth: thread * int -> int
val takeClearSetAtDepth: thread * int -> clear_set
val numChunksInClearSet: clear_set -> int
val processClearSetGrain: clear_set * int * int -> finished_clear_set_grain
val commitFinishedClearSetGrain: thread * finished_clear_set_grain -> unit
val deleteClearSet: clear_set -> unit

(* "put a new thread in the hierarchy *)
val moveNewThreadToDepth : thread * int -> unit

Expand Down
24 changes: 24 additions & 0 deletions basis-library/mlton/thread.sml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ struct
type thread = Basic.t
type t = MLtonPointer.t

type clear_set = MLtonPointer.t
type finished_clear_set_grain = MLtonPointer.t

fun forceLeftHeap (myId, t) = Prim.forceLeftHeap(Word32.fromInt myId, t)
fun forceNewChunk () = Prim.forceNewChunk (gcState ())
fun registerCont (kl, kr, k, t) = Prim.registerCont(kl, kr, k, t)
Expand All @@ -90,6 +93,27 @@ struct
Prim.moveNewThreadToDepth (t, Word32.fromInt d)
fun checkFinishedCCReadyToJoin () =
Prim.checkFinishedCCReadyToJoin (gcState ())

fun clearSuspectsAtDepth (t, d) =
Prim.clearSuspectsAtDepth (gcState (), t, Word32.fromInt d)

fun numSuspectsAtDepth (t, d) =
Word64.toInt (Prim.numSuspectsAtDepth (gcState (), t, Word32.fromInt d))

fun takeClearSetAtDepth (t, d) =
Prim.takeClearSetAtDepth (gcState (), t, Word32.fromInt d)

fun numChunksInClearSet c =
Word64.toInt (Prim.numChunksInClearSet (gcState (), c))

fun processClearSetGrain (c, start, stop) =
Prim.processClearSetGrain (gcState (), c, Word64.fromInt start, Word64.fromInt stop)

fun commitFinishedClearSetGrain (t, fcsg) =
Prim.commitFinishedClearSetGrain (gcState (), t, fcsg)

fun deleteClearSet c =
Prim.deleteClearSet (gcState (), c)
end

structure Disentanglement =
Expand Down
14 changes: 14 additions & 0 deletions basis-library/primitive/prim-mlton.sml
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ structure Thread =
val setMinLocalCollectionDepth = _import "GC_HH_setMinLocalCollectionDepth" runtime private: thread * Word32.word -> unit;
val mergeThreads = _import "GC_HH_mergeThreads" runtime private: thread * thread -> unit;
val promoteChunks = _import "GC_HH_promoteChunks" runtime private: thread -> unit;
val clearSuspectsAtDepth = _import "GC_HH_clearSuspectsAtDepth" runtime private:
GCState.t * thread * Word32.word -> unit;
val numSuspectsAtDepth = _import "GC_HH_numSuspectsAtDepth" runtime private:
GCState.t * thread * Word32.word -> Word64.word;
val takeClearSetAtDepth = _import "GC_HH_takeClearSetAtDepth" runtime private:
GCState.t * thread * Word32.word -> Pointer.t;
val numChunksInClearSet = _import "GC_HH_numChunksInClearSet" runtime private:
GCState.t * Pointer.t -> Word64.word;
val processClearSetGrain = _import "GC_HH_processClearSetGrain" runtime private:
GCState.t * Pointer.t * Word64.word * Word64.word -> Pointer.t;
val commitFinishedClearSetGrain = _import "GC_HH_commitFinishedClearSetGrain" runtime private:
GCState.t * thread * Pointer.t -> unit;
val deleteClearSet = _import "GC_HH_deleteClearSet" runtime private:
GCState.t * Pointer.t -> unit;

val decheckFork = _import "GC_HH_decheckFork" runtime private:
GCState.t * Word64.word ref * Word64.word ref -> unit;
Expand Down
130 changes: 128 additions & 2 deletions basis-library/schedulers/shh/Scheduler.sml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ struct
( HH.promoteChunks thread
; HH.setDepth (thread, depth)
; DE.decheckJoin (tidLeft, tidRight)
; maybeParClearSuspectsAtDepth (thread, depth)
(* ; dbgmsg' (fn _ => "join fast at depth " ^ Int.toString depth) *)
(* ; HH.forceNewChunk () *)
; let
Expand All @@ -362,6 +363,7 @@ struct
HH.setDepth (thread, depth);
DE.decheckJoin (tidLeft, tidRight);
setQueueDepth (myWorkerId ()) depth;
maybeParClearSuspectsAtDepth (thread, depth);
(* dbgmsg' (fn _ => "join slow at depth " ^ Int.toString depth); *)
case HM.refDerefNoBarrier rightSideResult of
NONE => die (fn _ => "scheduler bug: join failed: missing result")
Expand All @@ -374,8 +376,83 @@ struct
(extractResult fr, extractResult gr)
end


and simpleParFork thread depth (f: unit -> unit, g: unit -> unit) : unit =
let
val rightSideThread = ref (NONE: Thread.t option)
val rightSideResult = ref (NONE: unit result option)
val incounter = ref 2

fun forkGC thread depth (f : unit -> 'a, g : unit -> 'b) =
val (tidLeft, tidRight) = DE.decheckFork ()

fun g' () =
let
val () = DE.copySyncDepthsFromThread (thread, depth+1)
val () = DE.decheckSetTid tidRight
val gr = result g
val t = Thread.current ()
in
rightSideThread := SOME t;
rightSideResult := SOME gr;
if decrementHitsZero incounter then
( setQueueDepth (myWorkerId ()) (depth+1)
; threadSwitch thread
)
else
returnToSched ()
end
val _ = push (NormalTask g')
val _ = HH.setDepth (thread, depth + 1)
(* NOTE: off-by-one on purpose. Runtime depths start at 1. *)
val _ = recordForkDepth depth

val _ = DE.decheckSetTid tidLeft
val fr = result f
val tidLeft = DE.decheckGetTid thread

val gr =
if popDiscard () then
( HH.promoteChunks thread
; HH.setDepth (thread, depth)
; DE.decheckJoin (tidLeft, tidRight)
; maybeParClearSuspectsAtDepth (thread, depth)
(* ; dbgmsg' (fn _ => "join fast at depth " ^ Int.toString depth) *)
(* ; HH.forceNewChunk () *)
; let
val gr = result g
in
(* (gr, DE.decheckGetTid thread) *)
gr
end
)
else
( clear () (* this should be safe after popDiscard fails? *)
; if decrementHitsZero incounter then () else returnToSched ()
; case HM.refDerefNoBarrier rightSideThread of
NONE => die (fn _ => "scheduler bug: join failed")
| SOME t =>
let
val tidRight = DE.decheckGetTid t
in
HH.mergeThreads (thread, t);
HH.promoteChunks thread;
HH.setDepth (thread, depth);
DE.decheckJoin (tidLeft, tidRight);
setQueueDepth (myWorkerId ()) depth;
maybeParClearSuspectsAtDepth (thread, depth);
(* dbgmsg' (fn _ => "join slow at depth " ^ Int.toString depth); *)
case HM.refDerefNoBarrier rightSideResult of
NONE => die (fn _ => "scheduler bug: join failed: missing result")
| SOME gr => gr
end
)
in
(extractResult fr, extractResult gr);
()
end


and forkGC thread depth (f : unit -> 'a, g : unit -> 'b) =
let
val heapId = ref (HH.getRoot thread)
val gcTaskTuple = (thread, heapId)
Expand Down Expand Up @@ -416,6 +493,7 @@ struct

val _ = HH.promoteChunks thread
val _ = HH.setDepth (thread, depth)
val _ = maybeParClearSuspectsAtDepth (thread, depth)
(* val _ = dbgmsg' (fn _ => "join CC at depth " ^ Int.toString depth) *)
in
result
Expand All @@ -437,7 +515,55 @@ struct
(f (), g ())
end

fun fork (f, g) = fork' {ccOkayAtThisDepth=true} (f, g)
and fork (f, g) = fork' {ccOkayAtThisDepth=true} (f, g)

and simpleFork (f, g) =
let
val thread = Thread.current ()
val depth = HH.getDepth thread
in
(* if ccOkayAtThisDepth andalso depth = 1 then *)
if depth < Queue.capacity andalso depthOkayForDECheck depth then
simpleParFork thread depth (f, g)
else
(* don't let us hit an error, just sequentialize instead *)
(f (); g ())
end

and maybeParClearSuspectsAtDepth (t, d) =
if HH.numSuspectsAtDepth (t, d) <= 10000 then
HH.clearSuspectsAtDepth (t, d)
else
let
val cs = HH.takeClearSetAtDepth (t, d)
val count = HH.numChunksInClearSet cs
val grainSize = 20
val numGrains = 1 + (count-1) div grainSize
val results = ArrayExtra.alloc numGrains
fun start i = i*grainSize
fun stop i = Int.min (grainSize + start i, count)

fun processLoop i j =
if j-i = 1 then
Array.update (results, i, HH.processClearSetGrain (cs, start i, stop i))
else
let
val mid = i + (j-i) div 2
in
simpleFork (fn _ => processLoop i mid, fn _ => processLoop mid j)
end

fun commitLoop i =
if i >= numGrains then () else
( HH.commitFinishedClearSetGrain (t, Array.sub (results, i))
; commitLoop (i+1)
)
in
processLoop 0 numGrains;
commitLoop 0;
HH.deleteClearSet cs;
maybeParClearSuspectsAtDepth (t, d) (* need to go again, just in case *)
end
end

(* ========================================================================
Expand Down
8 changes: 4 additions & 4 deletions runtime/gc/assign.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ objptr Assignable_decheckObjptr(objptr dst, objptr src)
return src;
}

HM_EBR_leaveQuiescentState(s);
// HM_EBR_leaveQuiescentState(s);
if (!decheck(s, src))
{
assert (isMutable(s, dstp));
new_src = manage_entangled(s, src, getThreadCurrent(s)->decheckState);
assert (isPinned(new_src));
}
HM_EBR_enterQuiescentState(s);
// HM_EBR_enterQuiescentState(s);
assert (!hasFwdPtr(objptrToPointer(new_src, NULL)));
return new_src;
}
Expand All @@ -48,7 +48,7 @@ objptr Assignable_readBarrier(
{
return ptr;
}
HM_EBR_leaveQuiescentState(s);
// HM_EBR_leaveQuiescentState(s);
if (!decheck(s, ptr))
{
assert (ES_contains(NULL, obj));
Expand All @@ -64,7 +64,7 @@ objptr Assignable_readBarrier(
// }
ptr = manage_entangled(s, ptr, getThreadCurrent(s)->decheckState);
}
HM_EBR_enterQuiescentState(s);
// HM_EBR_enterQuiescentState(s);
assert (!hasFwdPtr(objptrToPointer(ptr, NULL)));

return ptr;
Expand Down
10 changes: 4 additions & 6 deletions runtime/gc/decheck.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ bool decheckIsOrdered(GC_thread thread, decheck_tid_t t1) {

#ifdef DETECT_ENTANGLEMENT

#ifdef ASSERT
#if ASSERT
void traverseAndCheck(
GC_state s,
__attribute__((unused)) objptr *opp,
Expand All @@ -340,14 +340,12 @@ void traverseAndCheck(
}
}
#else
void traverseAndCheck(
GC_state s,
void inline traverseAndCheck(
__attribute__((unused)) GC_state s,
__attribute__((unused)) objptr *opp,
objptr op,
__attribute__((unused)) objptr op,
__attribute__((unused)) void *rawArgs)
{
(void)s;
(void)op;
return;
}
#endif
Expand Down
6 changes: 1 addition & 5 deletions runtime/gc/decheck.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ bool decheckIsOrdered(GC_thread thread, decheck_tid_t t1);
int lcaHeapDepth(decheck_tid_t t1, decheck_tid_t t2);
bool disentangleObject(GC_state s, objptr op, uint32_t opDepth);
objptr manage_entangled(GC_state s, objptr ptr, decheck_tid_t reader);
void traverseAndCheck(
GC_state s,
__attribute__((unused)) objptr *opp,
objptr op,
__attribute__((unused)) void *rawArgs);
void traverseAndCheck(GC_state s, objptr *opp ,objptr op, void *rawArgs);
#endif /* (defined (MLTON_GC_INTERNAL_FUNCS)) */

#endif /* _DECHECK_H_ */
Loading