Skip to content

Commit b7629b4

Browse files
authored
Merge pull request #193 from MPLLang/scheduler-closure-optimization
optimize `par` fast path by packaging all scheduler data into single closure
2 parents 81d21d7 + efa352f commit b7629b4

File tree

1 file changed

+57
-36
lines changed

1 file changed

+57
-36
lines changed

basis-library/schedulers/par-pcall/MkScheduler.sml

+57-36
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ struct
3838
NONE => die (fn _ => "Cannot parse integer from \"-" ^ key ^ " " ^ s ^ "\"")
3939
| SOME x => x
4040

41-
val spawnCost = Word32.fromInt (parseInt "sched-spawn-cost" 1)
41+
(* val spawnCost = Word32.fromInt (parseInt "sched-spawn-cost" 1) *)
42+
val spawnCost = 0w1: Word32.word
4243

4344
type gcstate = MLton.Pointer.t
4445
val gcstate = _prim "GC_state": unit -> gcstate;
@@ -123,12 +124,13 @@ struct
123124
else die (fn _ => "scheduler bug: " ^ msg ^ ": atomic " ^ Int.toString ass ^ " but expected " ^ Int.toString x)
124125
end
125126

126-
fun threadSwitchEndAtomic t =
127+
(* fun threadSwitchEndAtomic t =
127128
( if Thread.atomicState () <> 0w0 then ()
128129
else die (fn _ => "scheduler bug: threadSwitchEndAtomic while non-atomic")
129130
; Thread.switchTo t
130-
)
131+
) *)
131132

133+
val threadSwitchEndAtomic = Thread.switchTo
132134

133135
(*
134136
fun doPromoteNow () =
@@ -990,6 +992,21 @@ struct
990992
end
991993

992994

995+
val sched_package_data = ref
996+
{ syncEndAtomic = syncEndAtomic maybeParClearSuspectsAtDepth
997+
, maybeSpawnFunc = maybeSpawnFunc
998+
, setQueueDepth = setQueueDepth
999+
, returnToSchedEndAtomic = returnToSchedEndAtomic
1000+
, tryConsumeSpareHeartbeats = tryConsumeSpareHeartbeats
1001+
, addEagerSpawns = addEagerSpawns
1002+
, assertAtomic = assertAtomic
1003+
, error = (fn s => die (fn _ => s)) : string -> unit
1004+
}
1005+
1006+
fun sched_package () = !sched_package_data
1007+
1008+
exception SchedulerError
1009+
9931010
(* ===================================================================
9941011
* fork definition
9951012
*)
@@ -1009,31 +1026,29 @@ struct
10091026
let
10101027
val _ = dbgmsg'' (fn _ => "hello from left-side par continuation")
10111028
val _ = Thread.atomicBegin ()
1012-
val _ = assertAtomic "leftSideParCont" 1
1029+
val _ = #assertAtomic (sched_package ()) "leftSideParCont" 1
10131030
val jp = primGetData ()
1014-
val gres =
1015-
syncEndAtomic maybeParClearSuspectsAtDepth jp (inject o g)
1031+
val gres = #syncEndAtomic (sched_package ()) jp (inject o g)
10161032
in
10171033
(Result.extractResult fres,
10181034
case project (Result.extractResult gres) of
10191035
SOME gres => gres
1020-
| _ => die (fn _ => "scheduler bug: leftSideParCont: failed project right-side result"))
1036+
| _ => (#error (sched_package ()) "scheduler bug: leftSideParCont: failed project right-side result"; raise SchedulerError))
10211037
end
10221038

10231039
fun rightSide () =
10241040
let
1025-
val _ = assertAtomic "pcallFork rightside begin" 1
1026-
val J {leftSideThread, rightSideThread, rightSideResult, tidRight, incounter, spareHeartbeatsGiven, ...} =
1027-
primGetData ()
1028-
val () = DE.decheckSetTid tidRight
1041+
val _ = #assertAtomic (sched_package ()) "pcallFork rightside begin" 1
1042+
val J jp = primGetData ()
1043+
val () = DE.decheckSetTid (#tidRight jp)
10291044

10301045
val thread = Thread.current ()
10311046
val depth = HH.getDepth thread
10321047
val _ = dbgmsg'' (fn _ => "rightside begin at depth " ^ Int.toString depth)
10331048

10341049
val _ = HH.forceLeftHeap(myWorkerId(), thread)
1035-
val _ = addSpareHeartbeats spareHeartbeatsGiven
1036-
val _ = assertAtomic "pcallfork rightSide before execute" 1
1050+
val _ = addSpareHeartbeats (#spareHeartbeatsGiven jp)
1051+
val _ = #assertAtomic (sched_package ()) "pcallfork rightSide before execute" 1
10371052
val _ = Thread.atomicEnd()
10381053

10391054
val gr = Result.result (inject o g)
@@ -1042,18 +1057,17 @@ struct
10421057
val depth' = HH.getDepth (Thread.current ())
10431058
val _ =
10441059
if depth = depth' then ()
1045-
else die (fn _ => "scheduler bug: depth mismatch: rightside began at depth " ^ Int.toString depth ^ " and ended at " ^ Int.toString depth')
1046-
1060+
else #error (sched_package ()) "scheduler bug: rightide depth mismatch"
10471061
val _ = dbgmsg'' (fn _ => "rightside done! at depth " ^ Int.toString depth')
1048-
val _ = assertAtomic "pcallFork rightside begin synchronize" 1
1062+
val _ = #assertAtomic (sched_package ()) "pcallFork rightside begin synchronize" 1
10491063
in
1050-
rightSideThread := SOME thread;
1051-
rightSideResult := SOME gr;
1064+
#rightSideThread jp := SOME thread;
1065+
#rightSideResult jp := SOME gr;
10521066

1053-
if decrementHitsZero incounter then
1067+
if decrementHitsZero (#incounter jp) then
10541068
( ()
10551069
; dbgmsg'' (fn _ => "rightside synchronize: become left")
1056-
; setQueueDepth (myWorkerId ()) depth
1070+
; #setQueueDepth (sched_package ()) (myWorkerId ()) depth
10571071
(** Atomic 1 *)
10581072
; Thread.atomicBegin ()
10591073

@@ -1063,13 +1077,13 @@ struct
10631077
* Switching threads is implicit atomicEnd(), so we need
10641078
* to be at atomic2
10651079
*)
1066-
; assertAtomic "pcallFork rightside switch-to-left" 2
1067-
; threadSwitchEndAtomic leftSideThread
1080+
; #assertAtomic (sched_package ()) "pcallFork rightside switch-to-left" 2
1081+
; threadSwitchEndAtomic (#leftSideThread jp)
10681082
)
10691083
else
10701084
( dbgmsg'' (fn _ => "rightside synchronize: back to sched")
1071-
; assertAtomic "pcallFork rightside before returnToSched" 1
1072-
; returnToSchedEndAtomic ()
1085+
; #assertAtomic (sched_package ()) "pcallFork rightside before returnToSched" 1
1086+
; #returnToSchedEndAtomic (sched_package ()) ()
10731087
)
10741088
end
10751089
in
@@ -1088,18 +1102,25 @@ struct
10881102
if currentSpareHeartbeatTokens () < spawnCost then
10891103
pcallFork (f, g)
10901104
else
1091-
case maybeSpawnFunc {allowCGC = true} g of
1092-
NONE => (f (), g ())
1093-
| SOME gj =>
1094-
let
1095-
val _ = tryConsumeSpareHeartbeats spawnCost
1096-
val _ = addEagerSpawns 1
1097-
val fr = Result.result f
1098-
val _ = Thread.atomicBegin ()
1099-
val gr = syncEndAtomic maybeParClearSuspectsAtDepth gj g
1100-
in
1101-
(Result.extractResult fr, Result.extractResult gr)
1102-
end
1105+
let
1106+
val (inject, project) = Universal.embed ()
1107+
in
1108+
case #maybeSpawnFunc (sched_package ()) {allowCGC = true} (inject o g) of
1109+
NONE => (f (), g ())
1110+
| SOME gj =>
1111+
let
1112+
val _ = #tryConsumeSpareHeartbeats (sched_package ()) spawnCost
1113+
val _ = #addEagerSpawns (sched_package ()) 1
1114+
val fr = Result.result f
1115+
val _ = Thread.atomicBegin ()
1116+
val gr = #syncEndAtomic (sched_package ()) gj (inject o g)
1117+
in
1118+
(Result.extractResult fr,
1119+
case project (Result.extractResult gr) of
1120+
SOME gr => gr
1121+
| _ => (#error (sched_package ()) "scheduler bug: greedyWorkAmortizedFork: failed project right-side result"; raise SchedulerError))
1122+
end
1123+
end
11031124

11041125
end
11051126

0 commit comments

Comments
 (0)