38
38
NONE => die (fn _ => " Cannot parse integer from \" -" ^ key ^ " " ^ s ^ " \" " )
39
39
| SOME x => x
40
40
41
- val spawnCost = Word32.fromInt (parseInt " sched-spawn-cost" 1 )
41
+ (* val spawnCost = Word32.fromInt (parseInt "sched-spawn-cost" 1) *)
42
+ val spawnCost = 0w1: Word32.word
42
43
43
44
type gcstate = MLton.Pointer.t
44
45
val gcstate = _prim " GC_state" : unit -> gcstate;
@@ -123,12 +124,13 @@ struct
123
124
else die (fn _ => " scheduler bug: " ^ msg ^ " : atomic " ^ Int.toString ass ^ " but expected " ^ Int.toString x)
124
125
end
125
126
126
- fun threadSwitchEndAtomic t =
127
+ (* fun threadSwitchEndAtomic t =
127
128
( if Thread.atomicState () <> 0w0 then ()
128
129
else die (fn _ => "scheduler bug: threadSwitchEndAtomic while non-atomic")
129
130
; Thread.switchTo t
130
- )
131
+ ) *)
131
132
133
+ val threadSwitchEndAtomic = Thread.switchTo
132
134
133
135
(*
134
136
fun doPromoteNow () =
@@ -990,6 +992,21 @@ struct
990
992
end
991
993
992
994
995
+ val sched_package_data = ref
996
+ { syncEndAtomic = syncEndAtomic maybeParClearSuspectsAtDepth
997
+ , maybeSpawnFunc = maybeSpawnFunc
998
+ , setQueueDepth = setQueueDepth
999
+ , returnToSchedEndAtomic = returnToSchedEndAtomic
1000
+ , tryConsumeSpareHeartbeats = tryConsumeSpareHeartbeats
1001
+ , addEagerSpawns = addEagerSpawns
1002
+ , assertAtomic = assertAtomic
1003
+ , error = (fn s => die (fn _ => s)) : string -> unit
1004
+ }
1005
+
1006
+ fun sched_package () = !sched_package_data
1007
+
1008
+ exception SchedulerError
1009
+
993
1010
(* ===================================================================
994
1011
* fork definition
995
1012
*)
@@ -1009,31 +1026,29 @@ struct
1009
1026
let
1010
1027
val _ = dbgmsg'' (fn _ => " hello from left-side par continuation" )
1011
1028
val _ = Thread.atomicBegin ()
1012
- val _ = assertAtomic " leftSideParCont" 1
1029
+ val _ = # assertAtomic (sched_package ()) " leftSideParCont" 1
1013
1030
val jp = primGetData ()
1014
- val gres =
1015
- syncEndAtomic maybeParClearSuspectsAtDepth jp (inject o g)
1031
+ val gres = #syncEndAtomic (sched_package ()) jp (inject o g)
1016
1032
in
1017
1033
(Result.extractResult fres,
1018
1034
case project (Result.extractResult gres) of
1019
1035
SOME gres => gres
1020
- | _ => die ( fn _ => " scheduler bug: leftSideParCont: failed project right-side result" ))
1036
+ | _ => (#error (sched_package ()) " scheduler bug: leftSideParCont: failed project right-side result" ; raise SchedulerError ))
1021
1037
end
1022
1038
1023
1039
fun rightSide () =
1024
1040
let
1025
- val _ = assertAtomic " pcallFork rightside begin" 1
1026
- val J {leftSideThread, rightSideThread, rightSideResult, tidRight, incounter, spareHeartbeatsGiven, ...} =
1027
- primGetData ()
1028
- val () = DE.decheckSetTid tidRight
1041
+ val _ = #assertAtomic (sched_package ()) " pcallFork rightside begin" 1
1042
+ val J jp = primGetData ()
1043
+ val () = DE.decheckSetTid (#tidRight jp)
1029
1044
1030
1045
val thread = Thread.current ()
1031
1046
val depth = HH.getDepth thread
1032
1047
val _ = dbgmsg'' (fn _ => " rightside begin at depth " ^ Int.toString depth)
1033
1048
1034
1049
val _ = HH.forceLeftHeap(myWorkerId(), thread)
1035
- val _ = addSpareHeartbeats spareHeartbeatsGiven
1036
- val _ = assertAtomic " pcallfork rightSide before execute" 1
1050
+ val _ = addSpareHeartbeats (# spareHeartbeatsGiven jp)
1051
+ val _ = # assertAtomic (sched_package ()) " pcallfork rightSide before execute" 1
1037
1052
val _ = Thread.atomicEnd()
1038
1053
1039
1054
val gr = Result.result (inject o g)
@@ -1042,18 +1057,17 @@ struct
1042
1057
val depth' = HH.getDepth (Thread.current ())
1043
1058
val _ =
1044
1059
if depth = depth' then ()
1045
- else die (fn _ => " scheduler bug: depth mismatch: rightside began at depth " ^ Int.toString depth ^ " and ended at " ^ Int.toString depth')
1046
-
1060
+ else #error (sched_package ()) " scheduler bug: rightide depth mismatch"
1047
1061
val _ = dbgmsg'' (fn _ => " rightside done! at depth " ^ Int.toString depth')
1048
- val _ = assertAtomic " pcallFork rightside begin synchronize" 1
1062
+ val _ = # assertAtomic (sched_package ()) " pcallFork rightside begin synchronize" 1
1049
1063
in
1050
- rightSideThread := SOME thread;
1051
- rightSideResult := SOME gr;
1064
+ # rightSideThread jp := SOME thread;
1065
+ # rightSideResult jp := SOME gr;
1052
1066
1053
- if decrementHitsZero incounter then
1067
+ if decrementHitsZero (# incounter jp) then
1054
1068
( ()
1055
1069
; dbgmsg'' (fn _ => " rightside synchronize: become left" )
1056
- ; setQueueDepth (myWorkerId ()) depth
1070
+ ; # setQueueDepth (sched_package ()) (myWorkerId ()) depth
1057
1071
(* * Atomic 1 *)
1058
1072
; Thread.atomicBegin ()
1059
1073
@@ -1063,13 +1077,13 @@ struct
1063
1077
* Switching threads is implicit atomicEnd(), so we need
1064
1078
* to be at atomic2
1065
1079
*)
1066
- ; assertAtomic " pcallFork rightside switch-to-left" 2
1067
- ; threadSwitchEndAtomic leftSideThread
1080
+ ; # assertAtomic (sched_package ()) " pcallFork rightside switch-to-left" 2
1081
+ ; threadSwitchEndAtomic (# leftSideThread jp)
1068
1082
)
1069
1083
else
1070
1084
( dbgmsg'' (fn _ => " rightside synchronize: back to sched" )
1071
- ; assertAtomic " pcallFork rightside before returnToSched" 1
1072
- ; returnToSchedEndAtomic ()
1085
+ ; # assertAtomic (sched_package ()) " pcallFork rightside before returnToSched" 1
1086
+ ; # returnToSchedEndAtomic (sched_package ()) ()
1073
1087
)
1074
1088
end
1075
1089
in
@@ -1088,18 +1102,25 @@ struct
1088
1102
if currentSpareHeartbeatTokens () < spawnCost then
1089
1103
pcallFork (f, g)
1090
1104
else
1091
- case maybeSpawnFunc {allowCGC = true } g of
1092
- NONE => (f (), g ())
1093
- | SOME gj =>
1094
- let
1095
- val _ = tryConsumeSpareHeartbeats spawnCost
1096
- val _ = addEagerSpawns 1
1097
- val fr = Result.result f
1098
- val _ = Thread.atomicBegin ()
1099
- val gr = syncEndAtomic maybeParClearSuspectsAtDepth gj g
1100
- in
1101
- (Result.extractResult fr, Result.extractResult gr)
1102
- end
1105
+ let
1106
+ val (inject, project) = Universal.embed ()
1107
+ in
1108
+ case #maybeSpawnFunc (sched_package ()) {allowCGC = true } (inject o g) of
1109
+ NONE => (f (), g ())
1110
+ | SOME gj =>
1111
+ let
1112
+ val _ = #tryConsumeSpareHeartbeats (sched_package ()) spawnCost
1113
+ val _ = #addEagerSpawns (sched_package ()) 1
1114
+ val fr = Result.result f
1115
+ val _ = Thread.atomicBegin ()
1116
+ val gr = #syncEndAtomic (sched_package ()) gj (inject o g)
1117
+ in
1118
+ (Result.extractResult fr,
1119
+ case project (Result.extractResult gr) of
1120
+ SOME gr => gr
1121
+ | _ => (#error (sched_package ()) " scheduler bug: greedyWorkAmortizedFork: failed project right-side result" ; raise SchedulerError))
1122
+ end
1123
+ end
1103
1124
1104
1125
end
1105
1126
0 commit comments