Skip to content

Commit

Permalink
make sure to start targets as early as possible when all dependencies…
Browse files Browse the repository at this point in the history
… are fullfilled
  • Loading branch information
matthid committed May 17, 2018
1 parent baec147 commit 0b735eb
Showing 1 changed file with 109 additions and 3 deletions.
112 changes: 109 additions & 3 deletions src/app/Fake.Core.Target/Target.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open System
open System.Collections.Generic
open Fake.Core
open Fake.Core.CommandLineParsing
open System.Threading.Tasks

module internal TargetCli =
let targetCli =
Expand Down Expand Up @@ -484,6 +485,110 @@ module Target =
else
{ context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] }

module internal ParallelRunner =
let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) =
let known =
ctx1.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
let filterKnown targets =
targets
|> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name))
{ ctx1 with
PreviousTargets =
ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets
}
// Centralized handling of target context and next target logic...
type RunnerHelper =
| GetNextTarget of TargetContext * AsyncReplyChannel<TargetContext * Async<Target option>>
type IRunnerHelper =
abstract GetNextTarget : TargetContext -> Async<TargetContext * Async<Target option>>
let createCtxMgr (order:Target[] list) (ctx:TargetContext) =
let body (inbox:MailboxProcessor<RunnerHelper>) = async {
let targetCount =
order |> Seq.sumBy (fun t -> t.Length)
let mutable ctx = ctx
let mutable waitList = []
let mutable runningTasks = []
//let mutable remainingOrders = order
while true do
let! msg = inbox.Receive()
match msg with
| GetNextTarget (newCtx, reply) ->
// semantic is:
// - We never return a target twice!
// - we fill up the waitlist first
ctx <- mergeContext ctx newCtx
let known =
ctx.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
runningTasks <-
runningTasks
|> List.filter (fun t -> not(known.ContainsKey t.Name))
if known.Count = targetCount then
for (w:System.Threading.Tasks.TaskCompletionSource<Target option>) in waitList do
w.SetResult None
waitList <- []
reply.Reply (ctx, async.Return None)
else
let isRunnable (t:Target) =
not (known.ContainsKey t.Name) && // not already finised
not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running
t.Dependencies // all dependencies finished
|> Seq.forall (fun d -> known.ContainsKey d)
let runnable =
order
|> Seq.concat
|> Seq.filter isRunnable
|> Seq.toList

let rec getNextFreeRunableTarget (r) =
match r with
| t :: rest ->
match waitList with
| h :: restwait ->
h.SetResult (Some t)
waitList <- restwait
getNextFreeRunableTarget rest
| [] -> Some t
| [] -> None
match getNextFreeRunableTarget runnable with
| Some free ->
reply.Reply (ctx, async.Return(Some free))
| None ->
// queue work
let tcs = new TaskCompletionSource<Target option>()
waitList <- waitList @ [ tcs ]
reply.Reply (ctx, tcs.Task |> Async.AwaitTask)
}

let mbox = MailboxProcessor.Start(body)
{ new IRunnerHelper with
member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply))
}

let runOptimal workerNum (order:Target[] list) targetContext =
let mgr = createCtxMgr order targetContext
let targetRunner () =
async {
let! (tctx, att) = mgr.GetNextTarget(targetContext)
let! tt = att
let mutable ctx = tctx
let mutable nextTarget = tt
while nextTarget.IsSome do
let newCtx = runSingleTarget nextTarget.Value ctx
let! (tctx, att) = mgr.GetNextTarget(newCtx)
let! tt = att
ctx <- tctx
nextTarget <- tt
return ctx
} |> Async.StartAsTask
Array.init workerNum (fun _ -> targetRunner())
|> Task.WhenAll
|> Async.AwaitTask
|> Async.RunSynchronously
|> Seq.reduce mergeContext

/// Runs the given array of targets in parallel using count tasks
let internal runTargetsParallel (count : int) (targets : Target[]) context =
Expand Down Expand Up @@ -530,9 +635,10 @@ module Target =
if parallelJobs > 1 && not singleTarget then
Trace.tracefn "Running parallel build with %d workers" parallelJobs

// run every level in parallel
order
|> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
// always try to keep "parallelJobs" runners busy
ParallelRunner.runOptimal parallelJobs order context
//order
// |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
else
let targets = order |> Seq.collect id |> Seq.toArray
let lastTarget = targets |> Array.last
Expand Down

0 comments on commit 0b735eb

Please sign in to comment.