Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure to start targets as early as possible when all dependencies… #1934

Merged
merged 3 commits into from
May 17, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 127 additions & 4 deletions src/app/Fake.Core.Target/Target.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open System
open System.Collections.Generic
open Fake.Core
open Fake.Core.CommandLineParsing
open System.Threading.Tasks

module internal TargetCli =
let targetCli =
Expand Down Expand Up @@ -480,10 +481,131 @@ module Target =
let internal runSingleTarget (target : Target) (context:TargetContext) =
if not context.HasError then
use t = Trace.traceTarget target.Name (match target.Description with Some d -> d | _ -> "NoDescription") (dependencyString target)
runSimpleContextInternal target context
let res = runSimpleContextInternal target context
if res.HasError
then t.MarkFailed()
else t.MarkSuccess()
res
else
{ context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] }

module internal ParallelRunner =
let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) =
let known =
ctx1.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
let filterKnown targets =
targets
|> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name))
{ ctx1 with
PreviousTargets =
ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets
}
// Centralized handling of target context and next target logic...
type RunnerHelper =
| GetNextTarget of TargetContext * AsyncReplyChannel<TargetContext * Async<Target option>>
type IRunnerHelper =
abstract GetNextTarget : TargetContext -> Async<TargetContext * Async<Target option>>
let createCtxMgr (order:Target[] list) (ctx:TargetContext) =
let body (inbox:MailboxProcessor<RunnerHelper>) = async {
let targetCount =
order |> Seq.sumBy (fun t -> t.Length)
let resolution = Set.ofSeq(order |> Seq.concat |> Seq.map (fun t -> t.Name))
let inResolution (t:string) = resolution.Contains t
let mutable ctx = ctx
let mutable waitList = []
let mutable runningTasks = []
//let mutable remainingOrders = order
try
while true do
let! msg = inbox.Receive()
match msg with
| GetNextTarget (newCtx, reply) ->
// semantic is:
// - We never return a target twice!
// - we fill up the waitlist first
ctx <- mergeContext ctx newCtx
let known =
ctx.PreviousTargets
|> Seq.map (fun tres -> tres.Target.Name, tres)
|> dict
runningTasks <-
runningTasks
|> List.filter (fun t -> not(known.ContainsKey t.Name))
if known.Count = targetCount then
for (w:System.Threading.Tasks.TaskCompletionSource<Target option>) in waitList do
w.SetResult None
waitList <- []
reply.Reply (ctx, async.Return None)
else

let isRunnable (t:Target) =
not (known.ContainsKey t.Name) && // not already finised
not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running
t.Dependencies @ List.filter inResolution t.SoftDependencies // all dependencies finished
|> Seq.forall (fun d -> known.ContainsKey d)
let runnable =
order
|> Seq.concat
|> Seq.filter isRunnable
|> Seq.toList

let rec getNextFreeRunableTarget (r) =
match r with
| t :: rest ->
match waitList with
| h :: restwait ->
// fill some idle worker
runningTasks <- t :: runningTasks
h.SetResult (Some t)
waitList <- restwait
getNextFreeRunableTarget rest
| [] -> Some t
| [] -> None
match getNextFreeRunableTarget runnable with
| Some free ->
runningTasks <- free :: runningTasks
reply.Reply (ctx, async.Return(Some free))
| None ->
// queue work
let tcs = new TaskCompletionSource<Target option>()
waitList <- waitList @ [ tcs ]
reply.Reply (ctx, tcs.Task |> Async.AwaitTask)
with e ->
while true do
let! msg = inbox.Receive()
match msg with
| GetNextTarget (_, reply) ->
reply.Reply (ctx, async { return raise <| exn("mailbox failed", e) })
}

let mbox = MailboxProcessor.Start(body)
{ new IRunnerHelper with
member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply))
}

let runOptimal workerNum (order:Target[] list) targetContext =
let mgr = createCtxMgr order targetContext
let targetRunner () =
async {
let! (tctx, att) = mgr.GetNextTarget(targetContext)
let! tt = att
let mutable ctx = tctx
let mutable nextTarget = tt
while nextTarget.IsSome do
let newCtx = runSingleTarget nextTarget.Value ctx
let! (tctx, att) = mgr.GetNextTarget(newCtx)
let! tt = att
ctx <- tctx
nextTarget <- tt
return ctx
} |> Async.StartAsTask
Array.init workerNum (fun _ -> targetRunner())
|> Task.WhenAll
|> Async.AwaitTask
|> Async.RunSynchronously
|> Seq.reduce mergeContext

/// Runs the given array of targets in parallel using count tasks
let internal runTargetsParallel (count : int) (targets : Target[]) context =
Expand Down Expand Up @@ -530,9 +652,10 @@ module Target =
if parallelJobs > 1 && not singleTarget then
Trace.tracefn "Running parallel build with %d workers" parallelJobs

// run every level in parallel
order
|> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
// always try to keep "parallelJobs" runners busy
ParallelRunner.runOptimal parallelJobs order context
//order
// |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context
else
let targets = order |> Seq.collect id |> Seq.toArray
let lastTarget = targets |> Array.last
Expand Down