Skip to content

Commit 259976a

Browse files
committed
package all scheduler data into single closure -- significant performance improvement
1 parent 81d21d7 commit 259976a

File tree

1 file changed

+58
-36
lines changed

1 file changed

+58
-36
lines changed

basis-library/schedulers/par-pcall/MkScheduler.sml

+58-36
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ struct
3838
NONE => die (fn _ => "Cannot parse integer from \"-" ^ key ^ " " ^ s ^ "\"")
3939
| SOME x => x
4040

41-
val spawnCost = Word32.fromInt (parseInt "sched-spawn-cost" 1)
41+
(* val spawnCost = Word32.fromInt (parseInt "sched-spawn-cost" 1) *)
42+
val spawnCost = 0w1: Word32.word
4243

4344
type gcstate = MLton.Pointer.t
4445
val gcstate = _prim "GC_state": unit -> gcstate;
@@ -123,11 +124,13 @@ struct
123124
else die (fn _ => "scheduler bug: " ^ msg ^ ": atomic " ^ Int.toString ass ^ " but expected " ^ Int.toString x)
124125
end
125126

126-
fun threadSwitchEndAtomic t =
127+
(* fun threadSwitchEndAtomic t =
127128
( if Thread.atomicState () <> 0w0 then ()
128129
else die (fn _ => "scheduler bug: threadSwitchEndAtomic while non-atomic")
129130
; Thread.switchTo t
130-
)
131+
) *)
132+
133+
fun threadSwitchEndAtomic t = Thread.switchTo t
131134

132135

133136
(*
@@ -990,6 +993,21 @@ struct
990993
end
991994

992995

996+
val sched_package_data = ref
997+
{ syncEndAtomic = syncEndAtomic maybeParClearSuspectsAtDepth
998+
, maybeSpawnFunc = maybeSpawnFunc
999+
, setQueueDepth = setQueueDepth
1000+
, returnToSchedEndAtomic = returnToSchedEndAtomic
1001+
, tryConsumeSpareHeartbeats = tryConsumeSpareHeartbeats
1002+
, addEagerSpawns = addEagerSpawns
1003+
, assertAtomic = assertAtomic
1004+
, error = (fn s => die (fn _ => s)) : string -> unit
1005+
}
1006+
1007+
fun sched_package () = !sched_package_data
1008+
1009+
exception SchedulerError
1010+
9931011
(* ===================================================================
9941012
* fork definition
9951013
*)
@@ -1009,31 +1027,29 @@ struct
10091027
let
10101028
val _ = dbgmsg'' (fn _ => "hello from left-side par continuation")
10111029
val _ = Thread.atomicBegin ()
1012-
val _ = assertAtomic "leftSideParCont" 1
1030+
val _ = #assertAtomic (sched_package ()) "leftSideParCont" 1
10131031
val jp = primGetData ()
1014-
val gres =
1015-
syncEndAtomic maybeParClearSuspectsAtDepth jp (inject o g)
1032+
val gres = #syncEndAtomic (sched_package ()) jp (inject o g)
10161033
in
10171034
(Result.extractResult fres,
10181035
case project (Result.extractResult gres) of
10191036
SOME gres => gres
1020-
| _ => die (fn _ => "scheduler bug: leftSideParCont: failed project right-side result"))
1037+
| _ => (#error (sched_package ()) "scheduler bug: leftSideParCont: failed project right-side result"; raise SchedulerError))
10211038
end
10221039

10231040
fun rightSide () =
10241041
let
1025-
val _ = assertAtomic "pcallFork rightside begin" 1
1026-
val J {leftSideThread, rightSideThread, rightSideResult, tidRight, incounter, spareHeartbeatsGiven, ...} =
1027-
primGetData ()
1028-
val () = DE.decheckSetTid tidRight
1042+
val _ = #assertAtomic (sched_package ()) "pcallFork rightside begin" 1
1043+
val J jp = primGetData ()
1044+
val () = DE.decheckSetTid (#tidRight jp)
10291045

10301046
val thread = Thread.current ()
10311047
val depth = HH.getDepth thread
10321048
val _ = dbgmsg'' (fn _ => "rightside begin at depth " ^ Int.toString depth)
10331049

10341050
val _ = HH.forceLeftHeap(myWorkerId(), thread)
1035-
val _ = addSpareHeartbeats spareHeartbeatsGiven
1036-
val _ = assertAtomic "pcallfork rightSide before execute" 1
1051+
val _ = addSpareHeartbeats (#spareHeartbeatsGiven jp)
1052+
val _ = #assertAtomic (sched_package ()) "pcallfork rightSide before execute" 1
10371053
val _ = Thread.atomicEnd()
10381054

10391055
val gr = Result.result (inject o g)
@@ -1042,18 +1058,17 @@ struct
10421058
val depth' = HH.getDepth (Thread.current ())
10431059
val _ =
10441060
if depth = depth' then ()
1045-
else die (fn _ => "scheduler bug: depth mismatch: rightside began at depth " ^ Int.toString depth ^ " and ended at " ^ Int.toString depth')
1046-
1061+
else #error (sched_package ()) "scheduler bug: rightide depth mismatch"
10471062
val _ = dbgmsg'' (fn _ => "rightside done! at depth " ^ Int.toString depth')
1048-
val _ = assertAtomic "pcallFork rightside begin synchronize" 1
1063+
val _ = #assertAtomic (sched_package ()) "pcallFork rightside begin synchronize" 1
10491064
in
1050-
rightSideThread := SOME thread;
1051-
rightSideResult := SOME gr;
1065+
#rightSideThread jp := SOME thread;
1066+
#rightSideResult jp := SOME gr;
10521067

1053-
if decrementHitsZero incounter then
1068+
if decrementHitsZero (#incounter jp) then
10541069
( ()
10551070
; dbgmsg'' (fn _ => "rightside synchronize: become left")
1056-
; setQueueDepth (myWorkerId ()) depth
1071+
; #setQueueDepth (sched_package ()) (myWorkerId ()) depth
10571072
(** Atomic 1 *)
10581073
; Thread.atomicBegin ()
10591074

@@ -1063,13 +1078,13 @@ struct
10631078
* Switching threads is implicit atomicEnd(), so we need
10641079
* to be at atomic2
10651080
*)
1066-
; assertAtomic "pcallFork rightside switch-to-left" 2
1067-
; threadSwitchEndAtomic leftSideThread
1081+
; #assertAtomic (sched_package ()) "pcallFork rightside switch-to-left" 2
1082+
; threadSwitchEndAtomic (#leftSideThread jp)
10681083
)
10691084
else
10701085
( dbgmsg'' (fn _ => "rightside synchronize: back to sched")
1071-
; assertAtomic "pcallFork rightside before returnToSched" 1
1072-
; returnToSchedEndAtomic ()
1086+
; #assertAtomic (sched_package ()) "pcallFork rightside before returnToSched" 1
1087+
; #returnToSchedEndAtomic (sched_package ()) ()
10731088
)
10741089
end
10751090
in
@@ -1088,18 +1103,25 @@ struct
10881103
if currentSpareHeartbeatTokens () < spawnCost then
10891104
pcallFork (f, g)
10901105
else
1091-
case maybeSpawnFunc {allowCGC = true} g of
1092-
NONE => (f (), g ())
1093-
| SOME gj =>
1094-
let
1095-
val _ = tryConsumeSpareHeartbeats spawnCost
1096-
val _ = addEagerSpawns 1
1097-
val fr = Result.result f
1098-
val _ = Thread.atomicBegin ()
1099-
val gr = syncEndAtomic maybeParClearSuspectsAtDepth gj g
1100-
in
1101-
(Result.extractResult fr, Result.extractResult gr)
1102-
end
1106+
let
1107+
val (inject, project) = Universal.embed ()
1108+
in
1109+
case #maybeSpawnFunc (sched_package ()) {allowCGC = true} (inject o g) of
1110+
NONE => (f (), g ())
1111+
| SOME gj =>
1112+
let
1113+
val _ = #tryConsumeSpareHeartbeats (sched_package ()) spawnCost
1114+
val _ = #addEagerSpawns (sched_package ()) 1
1115+
val fr = Result.result f
1116+
val _ = Thread.atomicBegin ()
1117+
val gr = #syncEndAtomic (sched_package ()) gj (inject o g)
1118+
in
1119+
(Result.extractResult fr,
1120+
case project (Result.extractResult gr) of
1121+
SOME gr => gr
1122+
| _ => (#error (sched_package ()) "scheduler bug: greedyWorkAmortizedFork: failed project right-side result"; raise SchedulerError))
1123+
end
1124+
end
11031125

11041126
end
11051127

0 commit comments

Comments
 (0)