From 0b735eb99132cddce327c5a82d1d96d2d0295187 Mon Sep 17 00:00:00 2001 From: Matthias Dittrich Date: Thu, 17 May 2018 12:27:28 +0200 Subject: [PATCH] make sure to start targets as early as possible when all dependencies are fullfilled --- src/app/Fake.Core.Target/Target.fs | 112 ++++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 3 deletions(-) diff --git a/src/app/Fake.Core.Target/Target.fs b/src/app/Fake.Core.Target/Target.fs index 6ce5d6c2f77..cd03741950b 100644 --- a/src/app/Fake.Core.Target/Target.fs +++ b/src/app/Fake.Core.Target/Target.fs @@ -4,6 +4,7 @@ open System open System.Collections.Generic open Fake.Core open Fake.Core.CommandLineParsing +open System.Threading.Tasks module internal TargetCli = let targetCli = @@ -484,6 +485,110 @@ module Target = else { context with PreviousTargets = context.PreviousTargets @ [{ Error = None; Time = TimeSpan.Zero; Target = target; WasSkipped = true }] } + module internal ParallelRunner = + let internal mergeContext (ctx1:TargetContext) (ctx2:TargetContext) = + let known = + ctx1.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + let filterKnown targets = + targets + |> List.filter (fun tres -> not (known.ContainsKey tres.Target.Name)) + { ctx1 with + PreviousTargets = + ctx1.PreviousTargets @ filterKnown ctx2.PreviousTargets + } + // Centralized handling of target context and next target logic... + type RunnerHelper = + | GetNextTarget of TargetContext * AsyncReplyChannel> + type IRunnerHelper = + abstract GetNextTarget : TargetContext -> Async> + let createCtxMgr (order:Target[] list) (ctx:TargetContext) = + let body (inbox:MailboxProcessor) = async { + let targetCount = + order |> Seq.sumBy (fun t -> t.Length) + let mutable ctx = ctx + let mutable waitList = [] + let mutable runningTasks = [] + //let mutable remainingOrders = order + while true do + let! msg = inbox.Receive() + match msg with + | GetNextTarget (newCtx, reply) -> + // semantic is: + // - We never return a target twice! + // - we fill up the waitlist first + ctx <- mergeContext ctx newCtx + let known = + ctx.PreviousTargets + |> Seq.map (fun tres -> tres.Target.Name, tres) + |> dict + runningTasks <- + runningTasks + |> List.filter (fun t -> not(known.ContainsKey t.Name)) + if known.Count = targetCount then + for (w:System.Threading.Tasks.TaskCompletionSource) in waitList do + w.SetResult None + waitList <- [] + reply.Reply (ctx, async.Return None) + else + let isRunnable (t:Target) = + not (known.ContainsKey t.Name) && // not already finised + not (runningTasks |> Seq.exists (fun r -> r.Name = t.Name)) && // not already running + t.Dependencies // all dependencies finished + |> Seq.forall (fun d -> known.ContainsKey d) + let runnable = + order + |> Seq.concat + |> Seq.filter isRunnable + |> Seq.toList + + let rec getNextFreeRunableTarget (r) = + match r with + | t :: rest -> + match waitList with + | h :: restwait -> + h.SetResult (Some t) + waitList <- restwait + getNextFreeRunableTarget rest + | [] -> Some t + | [] -> None + match getNextFreeRunableTarget runnable with + | Some free -> + reply.Reply (ctx, async.Return(Some free)) + | None -> + // queue work + let tcs = new TaskCompletionSource() + waitList <- waitList @ [ tcs ] + reply.Reply (ctx, tcs.Task |> Async.AwaitTask) + } + + let mbox = MailboxProcessor.Start(body) + { new IRunnerHelper with + member __.GetNextTarget (ctx) = mbox.PostAndAsyncReply(fun reply -> GetNextTarget(ctx, reply)) + } + + let runOptimal workerNum (order:Target[] list) targetContext = + let mgr = createCtxMgr order targetContext + let targetRunner () = + async { + let! (tctx, att) = mgr.GetNextTarget(targetContext) + let! tt = att + let mutable ctx = tctx + let mutable nextTarget = tt + while nextTarget.IsSome do + let newCtx = runSingleTarget nextTarget.Value ctx + let! (tctx, att) = mgr.GetNextTarget(newCtx) + let! tt = att + ctx <- tctx + nextTarget <- tt + return ctx + } |> Async.StartAsTask + Array.init workerNum (fun _ -> targetRunner()) + |> Task.WhenAll + |> Async.AwaitTask + |> Async.RunSynchronously + |> Seq.reduce mergeContext /// Runs the given array of targets in parallel using count tasks let internal runTargetsParallel (count : int) (targets : Target[]) context = @@ -530,9 +635,10 @@ module Target = if parallelJobs > 1 && not singleTarget then Trace.tracefn "Running parallel build with %d workers" parallelJobs - // run every level in parallel - order - |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context + // always try to keep "parallelJobs" runners busy + ParallelRunner.runOptimal parallelJobs order context + //order + // |> Seq.fold (fun context par -> runTargetsParallel parallelJobs par context) context else let targets = order |> Seq.collect id |> Seq.toArray let lastTarget = targets |> Array.last