From 85b90b3f95c75615629aeea34254c47c0de643f8 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Tue, 7 Jan 2020 22:16:41 +0100 Subject: [PATCH] [WIP] Dataflow graph parallelism v2 (#94) * Stash pledges, blocked by https://github.com/nim-lang/Nim/issues/13048 * Workaround https://github.com/nim-lang/Nim/issues/13048 * Pass the pledges implementation tests * Prepare for supporting mixed single and iteration pledges * move pledge access counter to the impl. + Move file to channels subfolder * add iteration pledges * Sanity checks for loop pledges * create fulfill public API * Prepare parallel_tasks for supporting pledges * Add dataflow graph parallelism :boom: :fireworks: * Add daflow support to parallel for loops * Fix atomics + assert templates visibility on some platforms * remove anti overwrite check (not true as compilers don't always zero-init) * Please the template early symbol resolution god * Nestable GEMM reaching 2.87TFlops for a speedup of 17.96 on 18 cores machines * dependent loops * Prepare for multiple pledge dependencies support * Support joining dataflow graph * bindSym + augment TaskDataSize * Awaitable for loop now returns true if it was the last iteration to await * Fix + test againt vendor BLAS, still one non-nestable barrier :/ (but consistently 17.5x speedup) * typos * Allow failure on C++ --- azure-pipelines.yml | 22 +- benchmarks/matmul_gemm_blas/all_gemm.nim | 47 ++ benchmarks/matmul_gemm_blas/all_gemm.nim.cfg | 6 + .../matmul_gemm_blas/gemm_bench_common.nim | 36 +- .../gemm_pure_nim/gemm_packing_weave.nim | 16 +- .../gemm_pure_nim/gemm_weave.nim | 38 +- .../matmul_gemm_blas/laser_omp_gemm.nim | 4 +- benchmarks/matmul_gemm_blas/mkl_gemm.nim | 2 +- benchmarks/matmul_gemm_blas/openblas_gemm.nim | 4 +- benchmarks/matmul_gemm_blas/weave_gemm.nim | 11 +- weave.nim | 10 +- weave.nimble | 1 + weave/await_fsm.nim | 15 - .../channels_mpsc_unbounded_batch.nim | 29 +- weave/channels/pledges.nim | 625 ++++++++++++++++++ weave/contexts.nim | 33 +- weave/datatypes/sync_types.nim | 11 +- weave/parallel_for.nim | 111 +++- weave/parallel_for_staged.nim | 19 +- weave/parallel_macros.nim | 109 ++- weave/parallel_reduce.nim | 1 + weave/parallel_tasks.nim | 147 +++- weave/victims.nim | 1 + 23 files changed, 1149 insertions(+), 149 deletions(-) create mode 100644 benchmarks/matmul_gemm_blas/all_gemm.nim create mode 100644 benchmarks/matmul_gemm_blas/all_gemm.nim.cfg create mode 100644 weave/channels/pledges.nim diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 002f5f2..a190990 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -3,6 +3,8 @@ strategy: matrix: # Nim requires enforcing ARCH="x86" and ucpu # for 32-bit targets as it seems like Azure machines are 64-bit + # TODO: C++ is allowed to fail + # Nim doesn't compile pledges properly in C++ mode Windows_devel_32bit: VM: 'windows-latest' ARCH: x86 @@ -15,21 +17,21 @@ strategy: PLATFORM: x64 CHANNEL: devel WEAVE_TEST_LANG: c - Windows_cpp_devel_64bit: - VM: 'windows-latest' - PLATFORM: x64 - CHANNEL: devel - WEAVE_TEST_LANG: cpp + # Windows_cpp_devel_64bit: + # VM: 'windows-latest' + # PLATFORM: x64 + # CHANNEL: devel + # WEAVE_TEST_LANG: cpp Linux_devel_64bit: VM: 'ubuntu-16.04' PLATFORM: x64 CHANNEL: devel WEAVE_TEST_LANG: c - Linux_cpp_devel_64bit: - VM: 'ubuntu-16.04' - PLATFORM: x64 - CHANNEL: devel - WEAVE_TEST_LANG: cpp + # Linux_cpp_devel_64bit: + # VM: 'ubuntu-16.04' + # PLATFORM: x64 + # CHANNEL: devel + # WEAVE_TEST_LANG: cpp Linux_devel_32bit: VM: 'ubuntu-16.04' PLATFORM: x86 diff --git a/benchmarks/matmul_gemm_blas/all_gemm.nim b/benchmarks/matmul_gemm_blas/all_gemm.nim new file mode 100644 index 0000000..dc4cb8b --- /dev/null +++ b/benchmarks/matmul_gemm_blas/all_gemm.nim @@ -0,0 +1,47 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + ./laser_omp_gemm, + ./mkl_gemm, # OpenBLAS nd MKL cannot be linked at the same time + ./weave_gemm, + ./gemm_bench_common, + ./gemm_bench_config, + ../../weave + +# This aggregate all benchmarks in one +# Warning: Bench results are not reliable, it seems like threads/calls +# interfere with each other, even when only calling OpenMP-based code. + +when isMainModule: + import std/[random, sequtils] + + randomize(42) # For reproducibility + + let a = newSeqWith(M*K, float32 rand(-0.1..0.1)) + let b = newSeqWith(K*N, float32 rand(-0.1..0.1)) + + warmup() + echo "Warning: The aggregate bench is unreliable, the libraries interfere with each other." + + block: + reportConfig("Intel MKL + Laser OMP + Weave", float32, (M, K), (K, N)) + let mkl = benchMKL(a, b, (M,K), (K,N), NbSamples) + + # let laser = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples) + + init(Weave) + let weave = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples) + exit(Weave) + + let weaveError = mean_relative_error(weave, mkl) + echo "Mean Relative Error of Weave vs reference: ", weaveError + doAssert weaveError <= 1e-5'f32, $weaveError + + # let laserError = mean_relative_error(laser, mkl) + # echo "Mean Relative Error of Laser vs reference: ", laserError + # doAssert laserError <= 1e-5'f32, $laserError diff --git a/benchmarks/matmul_gemm_blas/all_gemm.nim.cfg b/benchmarks/matmul_gemm_blas/all_gemm.nim.cfg new file mode 100644 index 0000000..2836b1a --- /dev/null +++ b/benchmarks/matmul_gemm_blas/all_gemm.nim.cfg @@ -0,0 +1,6 @@ +clibdir:"/opt/intel/mkl/lib/intel64" +passl:"/opt/intel/mkl/lib/intel64/libmkl_intel_lp64.a" +passl:"-lmkl_core" +passl:"-lmkl_intel_thread" +passl:"-liomp5" +dynlibOverride:"mkl_intel_lp64" diff --git a/benchmarks/matmul_gemm_blas/gemm_bench_common.nim b/benchmarks/matmul_gemm_blas/gemm_bench_common.nim index 10abbb9..ffec0c0 100644 --- a/benchmarks/matmul_gemm_blas/gemm_bench_common.nim +++ b/benchmarks/matmul_gemm_blas/gemm_bench_common.nim @@ -1,5 +1,7 @@ -# Apache v2 License -# Mamy Ratsimbazafy +# Laser +# Copyright (c) 2018-Present Mamy André-Ratsimbazafy +# Distributed under the Apache v2 License (license terms are at http://www.apache.org/licenses/LICENSE-2.0). +# This file may not be copied, modified, or distributed except according to those terms. import std/[sequtils, times, monotimes, stats, strformat, random] @@ -87,3 +89,33 @@ template bench*(name: string, req_ops: int, initialisation, body: untyped) {.dir let global_stop = getMonoTime() let globalElapsed = inMilliseconds(global_stop - global_start) printStats(name, result) + +proc relative_error*[T: SomeFloat](y, y_true: T): T {.inline.} = + ## Relative error, |y_true - y|/max(|y_true|, |y|) + ## Normally the relative error is defined as |y_true - y| / |y_true|, + ## but here max is used to make it symmetric and to prevent dividing by zero, + ## guaranteed to return zero in the case when both values are zero. + let denom = max(abs(y_true), abs(y)) + if denom == 0.T: + return 0.T + result = abs(y_true - y) / denom + +proc absolute_error*[T: SomeFloat](y, y_true: T): T {.inline.} = + ## Absolute error for a single value, |y_true - y| + result = abs(y_true - y) + +proc mean_relative_error*[T: SomeFloat](y, y_true: seq[T]): T {.inline.} = + doAssert y.len == y_true.len + + result = 0.T + for i in 0 ..< y.len: + result += relative_error(y[i], y_true[i]) + result = result / y.len.T + +proc mean_absolute_error*[T: SomeFloat](y, y_true: seq[T]): T {.inline.} = + doAssert y.len == y_true.len + + result = 0.T + for i in 0 ..< y.len: + result += absolute_error(y[i], y_true[i]) + result = result / y.len.T diff --git a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim index ba2e12c..555d045 100644 --- a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim +++ b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim @@ -62,7 +62,7 @@ proc pack_A_mc_kc*[T; ukernel: static MicroKernel]( proc pack_B_kc_nc*[T; ukernel: static MicroKernel]( packedB: ptr UncheckedArray[T], kc, nc: int, - B: MatrixView[T]) = + B: MatrixView[T], kcTileReady: Pledge) = ## Packs panel [kc, nc] for ~B (half-L1 cache) ## Pads if needed ## @@ -76,10 +76,10 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel]( let unroll_stop = nc.round_step_down(NR) # 1. Pack n matrices of size kc*nr, n = nc/nr - parallelForStrided j in 0 ..< unroll_stop, stride = NR: - captures: {kc, buffer, B} - parallelFor k in 0 ..< kc: - captures: {j, kc, buffer, B} + parallelFor k in 0 ..< kc: + awaitable: kcLoop + captures: {kc, buffer, B, unroll_stop} + for j in countup(0, unroll_stop-1, NR): for jj in 0 ..< NR: buffer[j*kc + k*NR + jj] = B[k, j+jj] @@ -93,4 +93,8 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel]( for j in remainder ..< NR: # Pad with 0 if packing over the edge offBuf[k*NR + j] = 0.T - syncRoot(Weave) + # Note: the tail is processed in the calling thread + # so waiting there guarantees proper data dependencies + # provided the "k" loop is not nested (i.e. does real work instead of enqueueing tasks) + discard sync(kcLoop) + kcTileReady.fulfill() diff --git a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim index f9af869..b6172f6 100644 --- a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim +++ b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim @@ -155,8 +155,10 @@ proc gemm_impl[T; ukernel: static MicroKernel]( prefetch(tiles.b, Write, LowTemporalLocality) let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc] + let kcncTileReady = newPledge() let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc] - pack_B_kc_nc[T, ukernel](tiles.b, kc, nc, kcncB) # PackB panel [kc, nc] (nc is large or unknown) + spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown) + tiles.b, kc, nc, kcncB, kcncTileReady) # First time writing to C, we scale it, otherwise accumulate let beta = if pc == 0: beta else: 1.T @@ -164,7 +166,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( # #################################### # 3. for ic = 0,...,m−1 in steps of mc parallelFor icb in 0 ..< tiles.ic_num_tasks: - captures: {pc, tiles, nc, kc, alpha, beta, vA, vC, M} + captures: {kcncTileReady, pc, tiles, nc, kc, alpha, beta, vA, vC, M} let packA = tiles.a + icb * tiles.upanelA_size prefetch(packA, Write, LowTemporalLocality) @@ -174,12 +176,16 @@ proc gemm_impl[T; ukernel: static MicroKernel]( let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc] pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc] - gebp_mkernel[T, ukernel]( # GEBP macrokernel: + spawnDelayed( + kcncTileReady, + gebp_mkernel[T, ukernel]( # GEBP macrokernel: mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] = alpha, packA, tiles.b, # αA[ic:ic+mc, pc:pc+kc] * B[pc:pc+kc, jc:jc+nc] + beta, vC.stride(ic, 0) # βC[ic:ic+mc, jc:jc+nc] ) - syncRoot(Weave) + ) + # Only trigger the next phase when the last one is finished + syncRoot(Weave) # TODO: this is the last non-nestable barrier # ############################################################ # @@ -282,10 +288,9 @@ when isMainModule: b[0][0].unsafeAddr, 2, 1, 0.0, res_ab[0][0].addr, 2, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -309,10 +314,9 @@ when isMainModule: b[0][0].unsafeAddr, 2, 1, 0.0, res_ab[0][0].addr, 2, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -334,10 +338,9 @@ when isMainModule: b[0][0].unsafeAddr, 2, 1, 0.0, res_ab[0][0].addr, 2, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -360,10 +363,9 @@ when isMainModule: b[0][0].unsafeAddr, 4, 1, 0, res_ab[0][0].addr, 4, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -393,10 +395,9 @@ when isMainModule: b[0][0].unsafeAddr, 4, 1, 0, res_ab[0][0].addr, 4, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -424,10 +425,9 @@ when isMainModule: b[0][0].unsafeAddr, 2, 1, 0, res_ab[0][0].addr, 2, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -461,10 +461,9 @@ when isMainModule: b[0][0].unsafeAddr, 8, 1, 0, res_ab[0][0].addr, 8, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" @@ -507,10 +506,9 @@ when isMainModule: b[0][0].unsafeAddr, 8, 1, 0, res_ab[0][0].addr, 8, 1 ) - + syncRoot(Weave) # echo "expected: ", ab # echo "result: ", res_ab - doAssert res_ab == ab, $res_ab echo "SUCCESS\n" diff --git a/benchmarks/matmul_gemm_blas/laser_omp_gemm.nim b/benchmarks/matmul_gemm_blas/laser_omp_gemm.nim index c3cc5b7..b6d5fe9 100644 --- a/benchmarks/matmul_gemm_blas/laser_omp_gemm.nim +++ b/benchmarks/matmul_gemm_blas/laser_omp_gemm.nim @@ -14,7 +14,7 @@ when not defined(vcc): else: {.pragma: restrict, codegenDecl: "$# __restrict $#".} -proc benchLaserGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = +proc benchLaserGEMM*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = let req_ops = gemm_required_ops(ashape, bshape) let out_shape = gemm_out_shape(ashape, bshape) let out_size = out_shape.M * out_shape.N @@ -48,4 +48,4 @@ when isMainModule: let a = newSeqWith(M*K, float32 rand(-0.1..0.1)) let b = newSeqWith(K*N, float32 rand(-0.1..0.1)) - let mkl = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples) + let laser = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples) diff --git a/benchmarks/matmul_gemm_blas/mkl_gemm.nim b/benchmarks/matmul_gemm_blas/mkl_gemm.nim index 1e721ba..7860a54 100644 --- a/benchmarks/matmul_gemm_blas/mkl_gemm.nim +++ b/benchmarks/matmul_gemm_blas/mkl_gemm.nim @@ -23,7 +23,7 @@ proc gemm*(ORDER: OrderType, TRANSA, TRANSB: TransposeType, M, N, K: int, ALPHA: A: ptr float64, LDA: int, B: ptr float64, LDB: int, BETA: float64, C: ptr float64, LDC: int) {. dynlib: blas, importc: "cblas_dgemm" .} -proc benchMKL(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = +proc benchMKL*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = let req_ops = gemm_required_ops(ashape, bshape) let out_shape = gemm_out_shape(ashape, bshape) let out_size = out_shape.M * out_shape.N diff --git a/benchmarks/matmul_gemm_blas/openblas_gemm.nim b/benchmarks/matmul_gemm_blas/openblas_gemm.nim index 56dfea2..7b9911b 100644 --- a/benchmarks/matmul_gemm_blas/openblas_gemm.nim +++ b/benchmarks/matmul_gemm_blas/openblas_gemm.nim @@ -22,7 +22,7 @@ proc gemm*(ORDER: OrderType, TRANSA, TRANSB: TransposeType, M, N, K: int, ALPHA: A: ptr float64, LDA: int, B: ptr float64, LDB: int, BETA: float64, C: ptr float64, LDC: int) {. dynlib: blas, importc: "cblas_dgemm" .} -proc benchOpenBLAS(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = +proc benchOpenBLAS*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = let req_ops = gemm_required_ops(ashape, bshape) let out_shape = gemm_out_shape(ashape, bshape) let out_size = out_shape.M * out_shape.N @@ -53,4 +53,4 @@ when isMainModule: let a = newSeqWith(M*K, float32 rand(-0.1..0.1)) let b = newSeqWith(K*N, float32 rand(-0.1..0.1)) - let mkl = benchOpenBLAS(a, b, (M,K), (K,N), NbSamples) + let openblas = benchOpenBLAS(a, b, (M,K), (K,N), NbSamples) diff --git a/benchmarks/matmul_gemm_blas/weave_gemm.nim b/benchmarks/matmul_gemm_blas/weave_gemm.nim index a857ca7..475e0cb 100644 --- a/benchmarks/matmul_gemm_blas/weave_gemm.nim +++ b/benchmarks/matmul_gemm_blas/weave_gemm.nim @@ -7,14 +7,15 @@ when not compileOption("threads"): import ./gemm_bench_common, ./gemm_bench_config, - ./gemm_pure_nim/gemm_weave + ./gemm_pure_nim/gemm_weave, + ../../weave when not defined(vcc): {.pragma: restrict, codegenDecl: "$# __restrict__ $#".} else: {.pragma: restrict, codegenDecl: "$# __restrict $#".} -proc benchWeaveGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = +proc benchWeaveGEMM*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] = let req_ops = gemm_required_ops(ashape, bshape) let out_shape = gemm_out_shape(ashape, bshape) let out_size = out_shape.M * out_shape.N @@ -35,13 +36,13 @@ proc benchWeaveGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: b_ptr, N, 1, 0'f32, c_ptr, N, 1 ) + syncRoot(Weave) # Weave gemm is async and returns immediately # Bench when isMainModule: import std/[random, sequtils] - import ../../weave - randomize(42) # FOr reproducibility + randomize(42) # For reproducibility # warmup() reportConfig("Weave (Pure Nim)", float32, (M, K), (K, N)) @@ -50,5 +51,5 @@ when isMainModule: let b = newSeqWith(K*N, float32 rand(-0.1..0.1)) init(Weave) - let mkl = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples) + let weave = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples) exit(Weave) diff --git a/weave.nim b/weave.nim index c1c640f..05f2c9b 100644 --- a/weave.nim +++ b/weave.nim @@ -7,11 +7,12 @@ import weave/[parallel_tasks, parallel_for, parallel_for_staged, runtime, runtime_fsm, await_fsm], - weave/datatypes/flowvars + weave/datatypes/flowvars, + weave/channels/pledges export - Flowvar, Weave, - spawn, sync, syncRoot, + Flowvar, Weave, Pledge, + spawn, sync, syncRoot, spawnDelayed, parallelFor, parallelForStrided, parallelForStaged, parallelForStagedStrided, init, exit, loadBalance, @@ -22,4 +23,5 @@ export import weave/contexts export readyWith, forceFuture, - isRootTask + isRootTask, + fulfill, newPledge diff --git a/weave.nimble b/weave.nimble index b67e60b..9ff5cdb 100644 --- a/weave.nimble +++ b/weave.nimble @@ -29,6 +29,7 @@ task test, "Run Weave tests": test "", "weave/channels/channels_spsc_single.nim" test "", "weave/channels/channels_spsc_single_ptr.nim" test "", "weave/channels/channels_mpsc_unbounded_batch.nim" + test "", "weave/channels/pledges.nim" test "", "weave/datatypes/binary_worker_trees.nim" test "", "weave/datatypes/bounded_queues.nim" diff --git a/weave/await_fsm.nim b/weave/await_fsm.nim index 1e7dbc0..942fe09 100644 --- a/weave/await_fsm.nim +++ b/weave/await_fsm.nim @@ -197,23 +197,8 @@ LazyFV: # Public # ------------------------------------------- -type Dummy* = object - ## A dummy return type (Flowvar[Dummy]) - ## for waitable for-loops - # Do we add a dummy field to avoid a size of 0? - proc sync*[T](fv: FlowVar[T]): T {.inline.} = ## Blocks the current thread until the flowvar is available ## and returned. ## The thread is not idle and will complete pending tasks. fv.forceComplete(result) - -template sync*(fv: FlowVar[Dummy]) = - ## Blocks the current thread until the full loop task - ## associated with the dummy has finished - ## The thread is not idle and will complete pending tasks. - # This must be a template to avoid recursive dependency - # as forceFuture is in await_fsm and await_fsm depends - # on this module. - var dummy: Dummy - forceComplete(fv, dummy) diff --git a/weave/channels/channels_mpsc_unbounded_batch.nim b/weave/channels/channels_mpsc_unbounded_batch.nim index f76077b..3d88aed 100644 --- a/weave/channels/channels_mpsc_unbounded_batch.nim +++ b/weave/channels/channels_mpsc_unbounded_batch.nim @@ -4,6 +4,33 @@ import ../primitives/compiler_optimization_hints, # for prefetch ../instrumentation/[contracts, loggers] +# type dereference macro +# ------------------------------------------------ +# This macro dereference pointer types +# This workarounds: +# - https://github.com/nim-lang/Nim/issues/12714 +# - https://github.com/nim-lang/Nim/issues/13048 + +macro derefMPSC*(T: typedesc): typedesc = + # This somehows isn't bound properly when used in a typesection + let instantiated = T.getTypeInst + instantiated.expectkind(nnkBracketExpr) + doAssert instantiated[0].eqIdent"typeDesc" + + let ptrT = instantiated[1] + if ptrT.kind == nnkPtrTy: + return ptrT[0] + + let ptrTImpl = instantiated[1].getImpl + ptrTimpl.expectKind(nnkTypeDef) + ptrTImpl[2].expectKind(nnkPtrTy) + ptrTImpl[2][0].expectKind({nnkObjectTy, nnkSym}) + + return ptrTImpl[2][0] + +# MPSC channel +# ------------------------------------------------ + type Enqueueable = concept x, type T x is ptr @@ -33,7 +60,7 @@ type # Producers and consumer slow-path back{.align: WV_CacheLinePadding.}: Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 # Consumer only - front is a dummy node - front{.align: WV_CacheLinePadding.}: typeof(default(T)[]) + front{.align: WV_CacheLinePadding.}: derefMPSC(T) # Debugging # -------------------------------------------------------------- diff --git a/weave/channels/pledges.nim b/weave/channels/pledges.nim new file mode 100644 index 0000000..ea8f0af --- /dev/null +++ b/weave/channels/pledges.nim @@ -0,0 +1,625 @@ +# Weave +# Copyright (c) 2020 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + # stdlib + std/[atomics, macros], + # Internals + ./channels_mpsc_unbounded_batch, + ../datatypes/sync_types, + ../memory/[allocs, memory_pools], + ../instrumentation/contracts, + ../config + +# Pledges +# ---------------------------------------------------- +# Pledges are the counterpart to Flowvar. +# +# When a task depends on a pledge, it is delayed until the pledge is fulfilled +# This allows to model precise dependencies between tasks +# beyond what traditional control-flow dependencies (function calls, barriers, locks) allow. +# +# Furthermore control-flow dependencies like barriers and locks suffer from: +# - composability problem (barriers are incompatible with work stealing or nested parallelism). +# - restrict the parallelism exposed. +# - expose the programmers to concurrency woes that could be avoided +# by specifying precede/after relationship +# +# This data availabity based parallelism is also called: +# - dataflow parallelism +# - graph parallelism +# - data-driven task parallelism +# - pipeline parallelism +# - stream parallelism +# +# +# Details, use-cases, competing approaches provided at: https://github.com/mratsim/weave/issues/31 +# +# Protocol (https://github.com/mratsim/weave/pull/92#issuecomment-570795718) +# ---------------------------------------------------- +# +# A pledge is an ownerless MPSC channel that holds tasks. +# The number of tasks in the channel is bounded by the number of dependent tasks +# When a worker fulfills a pledge, it becomes the unique consumer of the MPSC channel. +# It flags the pledge as fullfilled and drain the channel of all tasks. +# When a task is dependent on a pledge, the worker that received the dependent task +# checks the fulfilled flag. +# Case 1: It is fulfilled, it schedules the task as a normal task +# Case 2: It is not fulfilled, it sends the task in the pledge MPSC channel. +# +# Tasks with multiple dependencies are represented by a list of pledges +# When a task is enqueued, it is sent to one of the unfulfilled pledge channel at random +# When that pledge is fulfilled, if all other pladges are fulfiled, it can be scheduled immediately +# Otherwise it is sent to one of the unfilfilled pledge at random. +# +# Memory management is done through atomic reference counting. +# Pledges for loop iterations can use a single reference count for all iterations, +# however each iteration should have its pledge channel. This has a memory cost, +# users should be encouraged to use tiling/blocking. +# +# Mutual exclusion +# There is a race if a producer worker delivers on the pledge and a consumer +# checks the pledge status. +# In our case, the fulfilled flag is write-once, the producer worker only requires +# a way to know if a data race could have occurred. +# +# The pledge keeps 2 monotonically increasing atomic count of consumers in and consumers out +# When a consumer checks the pledge: +# - it increments the consumer count "in" +# - on exit it always increments the consumer count "out" +# - if it's fulfilled, increments the consumer count "out", +# then exits and schedule the task itself +# - if it's not, it enqueues the task +# Then increments the count out +# - The producer thread checks after draining all tasks that the consumer in/out counts are the same +# otherwise it needs to drain again until it is sure that the consumer is out. +# Keeping 2 count avoids the ABA problem. +# Pledges are allocated from memory pool blocks of size 2x WV_CacheLinePadding (256 bytes) +# with an intrusive MPSC channel +# +# Analysis: +# This protocol avoids latency between when the data is ready and when the task is scheduled +# exposing the maximum amount of parallelism. +# Proof: As soon as the pledge is fulfilled, any dependent tasks are scheduled +# or the task was not yet created. In tasks created late is scheduled by the creating worker. +# +# This protocol minimizes the number of message sent. There is at most 1 per dependencies unlike +# a gossipsub, floodsub or episub approach which sends an exponential number of messages +# and are sensitive to relayers' delays. +# +# This protocol avoids any polling. An alternative approach would be to have +# worker that creates the dependent tasks to keep it in their queue +# and then subscribe to a dependency readiness channel (pubsub). +# They would need to regularly poll, which creates latency (they also have work) +# and also might require them to scan possibly long sequences of dependencies. +# +# This protocol avoids the need of multiple hash-tables or a graph-library +# to map Pledge=>seq[Task] and Task=>seq[Pledge] to quickly obtain +# all tasks that can be scheduled from a resolved pledge and +# to track the multiple dependencies a task can have. +# +# In particular this play well with the custom memory pool of Weave, unlike Nim sequences or hash-tables. +# +# This protocol is cache friendly. The deferred task is co-located with the producer pledge. +# When scheduled, it will use data hot in cache unless task is stolen but it can only be stolen if it's the +# only task left due to LIFO work and FIFO thefts. +# +# This protocol doesn't impose ordering on the producer and consumer (pledge fulfiller and pledge dependent task). +# Other approaches might lead to missed messages unless they introduce state/memory, +# which is always complex in "distributed" long-lived computations due to memory reclamation (hazard pointers, epoch-based reclamation, ...) + +type + Pledge* = object + ## A pledge represents a contract between + ## a producer task that fulfills or deliver on the pledge + ## and a consumer dependent task that is deferred until the pledge is fulfilled. + ## + ## The difference with a Flowvar is that a Pledge represents + ## a delayed input while a Flowvar represents a delayed result. + ## + ## Pledge enables the following parallelism paradigm known under the following names: + ## - dataflow parallelism + ## - graph parallelism + ## - pipeline parallelism + ## - data-driven task parallelism + ## - stream parallelism + ## + ## In particular, this is the only way to implement a "barrier" compatible + ## with a work-stealing scheduler that can be composed and nested in parallel regions + ## that an unknown number of workers will execute. + p: PledgePtr + + TaskNode = ptr object + ## Task Metadata. + task*: Task + # Next task in the current pledge channel + next: Atomic[pointer] + # Next task dependency if it has multiple + nextDep*: TaskNode + pledge*: Pledge + bucketID*: int32 + + PledgeKind = enum + Single + Iteration + + PledgePtr = ptr object + refCount: Atomic[int32] + case kind: PledgeKind + of Single: + impl: PledgeImpl + of Iteration: + numBuckets: int32 + start, stop, stride: int32 + impls: ptr UncheckedArray[PledgeImpl] + + PledgeImpl = object + # Issue: https://github.com/mratsim/weave/issues/93 + # TODO, the current MPSC channel always use a "count" field. + # Contrary to StealRequest and the remote freed memory in the memory pool, + # this is not needed, and atomics are expensive. + # It can be made optional with a useCount static bool. + # TODO, the current MPSC channel cannot use the memory pool due to extensive padding. + # Contrary to StealRequest and the remote freed memory in the memory pool, + # pledge channels are allocated on-demand and not once at init. + # Allocation overhead may be prohibitive. + # As a compromise instead of padding by 2x cachelines + # we could have Consumer | count | Producer with only cache-line padding. + chan{.align: WV_CacheLinePadding.}: ptr ChannelMpscUnboundedBatch[TaskNode] + deferredIn: Atomic[int32] + deferredOut: Atomic[int32] + fulfilled: Atomic[bool] + +const NoIter* = -1 + +# Internal +# ---------------------------------------------------- +# Refcounting is started from 0 and we avoid fetchSub with release semantics +# in the common case of only one reference being live. + +proc `=destroy`*(pledge: var Pledge) = + if pledge.p.isNil: + return + + let count = pledge.p.refCount.load(moRelaxed) + fence(moAcquire) + if count == 0: + # We have the last reference + if not pledge.p.isNil: + if pledge.p.kind == Single: + wv_free(pledge.p.impl.chan) # TODO: mem-pool compat + else: + for i in 0 ..< pledge.p.numBuckets: + wv_free(pledge.p.impls[i].chan) + wv_free(pledge.p.impls) + # Return memory to the memory pool + recycle(pledge.p) + else: + discard fetchSub(pledge.p.refCount, 1, moRelease) + pledge.p = nil + +proc `=sink`*(dst: var Pledge, src: Pledge) {.inline.} = + # Don't pay for atomic refcounting when compiler can prove there is no ref change + # `=destroy`(dst) # it seems like we can have non properly init types? + # with the pointer not being nil, but invalid as well + system.`=sink`(dst.p, src.p) + +proc `=`*(dst: var Pledge, src: Pledge) {.inline.} = + discard fetchAdd(src.p.refCount, 1, moRelaxed) + dst.p = src.p + +# Multi-Dependencies pledges +# ---------------------------------------------------- + +proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool = + ## Redelay a task that depends on multiple pledges + ## with 1 or more pledge fulfilled but still some unfulfilled. + ## field is a place holder for impl / impls[bucket] + preCondition: not taskNode.pledge.p.isNil + + if taskNode.pledge.p.impl.fulfilled.load(moRelaxed): + fence(moAcquire) + return false + + # Mutual exclusion / prevent races + discard taskNode.pledge.p.impl.deferredIn.fetchAdd(1, moRelaxed) + + if taskNode.pledge.p.impl.fulfilled.load(moRelaxed): + fence(moAcquire) + discard taskNode.pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) + return false + + # Send the task to the pledge fulfiller + taskNode.task = curTask + let pledge = taskNode.pledge + taskNode.pledge = default(Pledge) + discard pledge.p.impl.chan[].trySend(taskNode) + discard pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) + return true + +proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool = + ## Redelay a task that depends on multiple pledges + ## with 1 or more pledge fulfilled but still some unfulfilled. + ## field is a place holder for impl / impls[bucket] + preCondition: not taskNode.pledge.p.isNil + + if taskNode.pledge.p.impls[taskNode.bucketID].fulfilled.load(moRelaxed): + fence(moAcquire) + return false + + # Mutual exclusion / prevent races + discard taskNode.pledge.p.impls[taskNode.bucketID].deferredIn.fetchAdd(1, moRelaxed) + + if taskNode.pledge.p.impls[taskNode.bucketID].fulfilled.load(moRelaxed): + fence(moAcquire) + discard taskNode.pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) + return false + + # Send the task to the pledge fulfiller + taskNode.task = curTask + let pledge = taskNode.pledge + taskNode.pledge = default(Pledge) + discard pledge.p.impls[taskNode.bucketID].chan[].trySend(taskNode) + discard pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) + return true + +proc delayedUntil*(taskNode: TaskNode, curTask: Task): bool = + ## Redelay a task that depends on multiple pledges + ## with 1 or more pledge fulfilled but still some unfulfilled. + preCondition: not taskNode.pledge.p.isNil + if taskNode.pledge.p.kind == Single: + delayedUntilSingle(taskNode, curTask) + else: + delayedUntilIter(taskNode, curTask) + +# Public - single task pledge +# ---------------------------------------------------- + +proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator) = + ## Initialize a pledge. + ## Tasks can depend on a pledge and in that case their scheduling + ## will be delayed until that pledge is fulfilled. + ## This allows modelling precise data dependencies. + preCondition: pledge.p.isNil + pledge.p = pool.borrow(deref(PledgePtr)) + zeroMem(pledge.p, sizeof(deref(PledgePtr))) # We start the refCount at 0 + # TODO: mempooled MPSC channel https://github.com/mratsim/weave/issues/93 + pledge.p.kind = Single + pledge.p.impl.chan = wv_alloc(ChannelMpscUnboundedBatch[TaskNode]) + pledge.p.impl.chan[].initialize() + +proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool = + ## Defers a task until a pledge is fulfilled + ## Returns true if the task has been delayed. + ## The task should not be accessed anymore + ## Returns false if the task can be scheduled right away. + preCondition: not pledge.p.isNil + preCondition: pledge.p.kind == Single + + # Optimization to avoid paying the cost of atomics + if pledge.p.impl.fulfilled.load(moRelaxed): + fence(moAcquire) + return false + + # Mutual exclusion / prevent races + discard pledge.p.impl.deferredIn.fetchAdd(1, moRelaxed) + + if pledge.p.impl.fulfilled.load(moRelaxed): + fence(moAcquire) + discard pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) + return false + + # Send the task to the pledge fulfiller + let taskNode = pool.borrow(deref(TaskNode)) + taskNode.task = task + taskNode.next.store(nil, moRelaxed) + taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one + taskNode.bucketID = NoIter + discard pledge.p.impl.chan[].trySend(taskNode) + discard pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) + return true + +template fulfillImpl*(pledge: Pledge, queue, enqueue: typed) = + ## A producer thread fulfills a pledge. + ## A pledge can only be fulfilled once. + ## A producer will immediately scheduled all tasks dependent on that pledge + ## unless they also depend on another unfulfilled pledge. + ## Dependent tasks scheduled at a later time will be scheduled immediately + ## + ## `queue` is the data structure for ready tasks + ## `enqueue` is the correspondig enqueing proc + ## This should be wrapped in a proc to avoid code-bloat as the template is big + preCondition: not pledge.p.isNil + preCondition: pledge.p.kind == Single + preCondition: not load(pledge.p.impl.fulfilled, moRelaxed) + + # Lock the pledge, new tasks should be scheduled right away + fence(moRelease) + store(pledge.p.impl.fulfilled, true, moRelaxed) + + # TODO: some state machine here? + while true: + var task: Task + var taskNode: TaskNode + while pledge.p.impl.chan[].tryRecv(taskNode): + ascertain: taskNode.bucketID == NoIter + task = taskNode.task + var wasDelayed = false + while not taskNode.nextDep.isNil: + if delayedUntil(taskNode, task): + wasDelayed = true + break + let depNode = taskNode.nextDep + recycle(taskNode) + taskNode = depNode + if not wasDelayed: + enqueue(queue, task) + recycle(taskNode) + + if load(pledge.p.impl.deferredOut, moAcquire) != load(pledge.p.impl.deferredIn, moAcquire): + cpuRelax() + else: + break + +# Public - iteration task pledge +# ---------------------------------------------------- + +proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator, start, stop, stride: int32) = + ## Initialize a pledge for iteration tasks + + preCondition: stop > start + preCondition: stride > 0 + preCondition: pledge.p.isNil + + pledge.p = pool.borrow(deref(PledgePtr)) + # We start refcount at 0 - and the need to workaround case object value change is a huge pain + pledge.p[] = deref(PledgePtr)( + kind: Iteration, + numBuckets: (stop - start + stride-1) div stride, + start: start, + stop: stop, + stride: stride + ) + + pledge.p.impls = wv_alloc(PledgeImpl, pledge.p.numBuckets) + zeroMem(pledge.p.impls, pledge.p.numBuckets * sizeof(PledgeImpl)) + + for i in 0 ..< pledge.p.numBuckets: + pledge.p.impls[i].chan = wv_alloc(ChannelMpscUnboundedBatch[TaskNode]) + pledge.p.impls[i].chan[].initialize() + +proc getBucket(pledge: Pledge, index: int32): int32 {.inline.} = + ## Convert a possibly offset and/or strided for-loop iteration index + ## to a pledge bucket in the range [0, numBuckets) + preCondition: index in pledge.p.start ..< pledge.p.stop + result = (index - pledge.p.start) div pledge.p.stride + +proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAllocator): bool = + ## Defers a task until a pledge[index] is fulfilled + ## Returns true if the task has been delayed. + ## The task should not be accessed anymore + ## Returns false if the task can be scheduled right away. + preCondition: not pledge.p.isNil + preCondition: pledge.p.kind == Iteration + + let bucket = pledge.getBucket(index) + + # Optimization to avoid paying the cost of atomics + if pledge.p.impls[bucket].fulfilled.load(moRelaxed): + fence(moAcquire) + return false + + # Mutual exclusion / prevent races + discard pledge.p.impls[bucket].deferredIn.fetchAdd(1, moRelaxed) + + if pledge.p.impls[bucket].fulfilled.load(moRelaxed): + fence(moAcquire) + discard pledge.p.impls[bucket].deferredOut.fetchAdd(1, moRelaxed) + return false + + # Send the task to the pledge fulfiller + let taskNode = pool.borrow(deref(TaskNode)) + taskNode.task = task + taskNode.next.store(nil, moRelaxed) + taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one + taskNode.bucketID = bucket + discard pledge.p.impls[bucket].chan[].trySend(taskNode) + discard pledge.p.impls[bucket].deferredOut.fetchAdd(1, moRelaxed) + return true + +template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) = + ## A producer thread fulfills a pledge. + ## A pledge can only be fulfilled once. + ## A producer will immediately scheduled all tasks dependent on that pledge + ## unless they also depend on another unfulfilled pledge. + ## Dependent tasks scheduled at a later time will be scheduled immediately + ## + ## `queue` is the data structure for ready tasks + ## `enqueue` is the correspondig enqueing proc + ## This should be wrapped in a proc to avoid code-bloat as the template is big + preCondition: not pledge.p.isNil + preCondition: pledge.p.kind == Iteration + + let bucket = getBucket(pledge, index) + preCondition: not load(pledge.p.impls[bucket].fulfilled, moRelaxed) + + # Lock the pledge, new tasks should be scheduled right away + fence(moRelease) + store(pledge.p.impls[bucket].fulfilled, true, moRelaxed) + + # TODO: some state machine here? + while true: + var task {.inject.}: Task + var taskNode: TaskNode + while pledge.p.impls[bucket].chan[].tryRecv(taskNode): + ascertain: taskNode.bucketID != NoIter + task = taskNode.task + var wasDelayed = false + while not taskNode.nextDep.isNil: + if delayedUntil(taskNode, task): + wasDelayed = true + break + let depNode = taskNode.nextDep + recycle(taskNode) + taskNode = depNode + if not wasDelayed: + enqueue(queue, task) + recycle(taskNode) + + if load(pledge.p.impls[bucket].deferredOut, moAcquire) != load(pledge.p.impls[bucket].deferredIn, moAcquire): + cpuRelax() + else: + break + +# Multiple dependencies +# ------------------------------------------------------------------------------ + +macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, pledges: varargs[untyped]): untyped = + ## Associate a task with multiple dependencies + result = newStmtList() + + var taskNodesInitStmt = newStmtList() + var firstNode, prevNode: NimNode + for i in 0 ..< pledges.len: + var taskNode: NimNode + var taskNodeInit = newStmtList() + if pledges[i].kind == nnkPar: + let pledge = pledges[i][0] + taskNode = genSym(nskLet, "taskNode_" & $pledge[i] & "_" & $pledges[i][1] & "_") + let bucket = newCall(bindSym"getBucket", pledge, pledges[i][1]) + taskNodeInit.add quote do: + let `taskNode` = borrow(`pool`, deref(TaskNode)) + `taskNode`.task = `task` + `taskNode`.pledge = `pledge` + `taskNode`.bucketID = `bucket` + else: + taskNode = genSym(nskLet, "taskNode_" & $pledges[i] & "_") + let pledge = pledges[i] + taskNodeInit.add quote do: + let `taskNode` = borrow(`pool`, deref(TaskNode)) + `taskNode`.task = `task` + `taskNode`.pledge = `pledge` + `taskNode`.bucketID = NoIter + if i != 0: + taskNodeInit.add quote do: + `taskNode`.nextDep = `prevNode` + else: + firstNode = taskNode + prevnode = taskNode + taskNodesInitStmt.add taskNodeInit + + result.add taskNodesInitStmt + result.add newCall(bindSym"delayedUntil", firstNode, task) + +# Sanity checks +# ------------------------------------------------------------------------------ + +when isMainModule: + type TaskStack = object + top: Task + count: int + + proc add(stack: var TaskStack, task: sink Task) = + task.next = stack.top + stack.top = task + stack.count += 1 + + proc pop(stack: var TaskStack): Task = + result = stack.top + stack.top = stack.top.next + stack.count -= 1 + + doAssert: + if result.isNil: stack.count == 0 + else: true + + var pool: TLPoolAllocator + pool.initialize() + + proc mainSingle() = + var stack: TaskStack + + var pledge1: Pledge + pledge1.initialize(pool) + block: # Pledge 1 + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge1, pool) + doAssert delayed + + doAssert stack.count == 0 + + pledge1.fulfillImpl(stack, add) + + doAssert stack.count == 1 + + block: # Pledge 1 - late + let task = wv_allocPtr(Task, zero = true) + + let delayed = task.delayedUntil(pledge1, pool) + doAssert not delayed + + doAssert stack.count == 1 # enqueuing is left as an exercise to the late thread. + + var pledge2: Pledge + pledge2.initialize(pool) + block: + block: + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge2, pool) + doAssert delayed + block: + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge2, pool) + doAssert delayed + + doAssert stack.count == 1 + pledge2.fulfillImpl(stack, add) + doAssert stack.count == 3 + + mainSingle() + + proc mainLoop() = + var stack: TaskStack + + var pledge1: Pledge + pledge1.initialize(pool, 0, 10, 1) + block: # Pledge 1 + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge1, 3, pool) + doAssert delayed + + doAssert stack.count == 0 + + pledge1.fulfillIterImpl(3, stack, add) + + doAssert stack.count == 1 + + block: # Pledge 1 - late + let task = wv_allocPtr(Task, zero = true) + + let delayed = task.delayedUntil(pledge1, 3, pool) + doAssert not delayed + + doAssert stack.count == 1 # enqueuing is left as an exercise to the late thread. + + var pledge2: Pledge + pledge2.initialize(pool, 0, 10, 1) + block: + block: + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge2, 4, pool) + doAssert delayed + block: + let task = wv_allocPtr(Task, zero = true) + let delayed = task.delayedUntil(pledge2, 4, pool) + doAssert delayed + + doAssert stack.count == 1 + pledge2.fulfillIterImpl(4, stack, add) + doAssert stack.count == 3 + + mainLoop() diff --git a/weave/contexts.nim b/weave/contexts.nim index cc3b939..52706c5 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,7 +7,7 @@ import ./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, pledges], ./memory/[persistacks, lookaside_lists, memory_pools, allocs], ./config, ./instrumentation/[profilers, loggers, contracts] @@ -129,6 +129,37 @@ proc flushAndDispose*(dq: var PrellDeque) = for task in items(leftovers): recycle(task) +# Pledges +# ---------------------------------------------------------------------------------- + +proc newPledge*(): Pledge = + ## Creates a pledge + ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. + ## A pledge can only be fulfilled once. + ## Pledges enable modeling precise producer-consumer data dependencies. + result.initialize(myMemPool()) + +proc newPledge*(start, stop, stride: SomeInteger): Pledge = + ## Creates a loop iteration pledge. + ## With a loop iteration pledge, tasks can be associated with a precise loop index. + ## + ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. + ## A pledge can only be fulfilled once. + ## Pledges enable modeling precise producer-consumer data dependencies. + result.initialize(myMemPool(), start.int32, stop.int32, stride.int32) + +proc fulfill*(pledge: Pledge) = + ## Fulfills a pledge + ## All ready tasks that depended on that pledge will be scheduled immediately. + ## A ready task is a task that has all its pledged dependencies fulfilled. + fulfillImpl(pledge, myWorker().deque, addFirst) + +proc fulfill*(pledge: Pledge, index: SomeInteger) = + ## Fulfills an iteration pledge + ## All ready tasks that depended on that pledge will be scheduled immediately. + ## A ready task is a task that has all its pledged dependencies fulfilled. + fulfillIterImpl(pledge, int32(index), myWorker().deque, addFirst) + # Dynamic Scopes # ---------------------------------------------------------------------------------- diff --git a/weave/datatypes/sync_types.nim b/weave/datatypes/sync_types.nim index 320df80..99cd609 100644 --- a/weave/datatypes/sync_types.nim +++ b/weave/datatypes/sync_types.nim @@ -16,7 +16,7 @@ import # ---------------------------------------------------------------------------------- const - TaskDataSize* = 128 # Up to 256 for the whole thing + TaskDataSize* = 144 # Up to 256 for the whole thing type # Task @@ -39,12 +39,13 @@ type stop*: int stride*: int # 64 bytes + futures*: pointer # LinkedList of futures required by the current task + futureSize*: uint8 # Size of the future result type if relevant + hasFuture*: bool # If a task is associated with a future, the future is stored at data[0] + isLoop*: bool + isInitialIter*: bool # Awaitable for-loops return true for the initial iter when FirstVictim == LastVictim: victim*: WorkerID - isLoop*: bool - hasFuture*: bool # If a task is associated with a future, the future is stored at data[0] - futureSize*: uint8 # Size of the future result type if relevant - futures*: pointer # LinkedList of futures required by the current task # 79 bytes # User data - including the FlowVar channel to send back result. # It is very likely that User data contains a pointer (the Flowvar channel) diff --git a/weave/parallel_for.nim b/weave/parallel_for.nim index 3338846..8584d13 100644 --- a/weave/parallel_for.nim +++ b/weave/parallel_for.nim @@ -15,8 +15,10 @@ import # Internal ./parallel_macros, ./parallel_reduce, ./contexts, ./runtime, ./config, - ./instrumentation/contracts, - ./datatypes/flowvars, ./await_fsm + ./instrumentation/[loggers, contracts], + ./datatypes/flowvars, ./await_fsm, + ./channels/pledges, + ./parallel_tasks when not compileOption("threads"): {.error: "This requires --threads:on compilation flag".} @@ -94,12 +96,13 @@ template parallelForAwaitableWrapper( this.futures = cast[pointer](fvNode.next) LazyFV: - let dummyFV = cast[Flowvar[Dummy]](fvNode.lfv) + let dummyFV = cast[Flowvar[bool]](fvNode.lfv) EagerFV: - let dummyFV = cast[Flowvar[Dummy]](fvNode.chan) + let dummyFV = cast[Flowvar[bool]](fvNode.chan) debugSplit: log("Worker %2d: loop task 0x%.08x (iterations [%ld, %ld)) waiting for the remainder\n", myID(), this.fn, this.start, this.stop) - sync(dummyFV) + let isLastIter = sync(dummyFV) + ascertain: not isLastIter debugSplit: log("Worker %2d: loop task 0x%.08x (iterations [%ld, %ld)) complete\n", myID(), this.fn, this.start, this.stop) # The "sync" in the merge statement should have recycled the flowvar channel already @@ -107,6 +110,38 @@ template parallelForAwaitableWrapper( # 2 deallocs for eager FV and 3 for Lazy FV recycleFVN(fvNode) +proc parallelForSplitted(index, start, stop, stride, captured, capturedTy, dependsOn, body: NimNode): NimNode = + ## In case a parallelFor depends on iteration pledge indexed by the loop variable + ## we can't use regular parallel loop with lazy splitting + ## we need to split the loop eagerly so that each iterations can be started independently + ## as soo as the corresponding iteration pledge is fulfilled. + ## In that case, the loop cannot have futures. + + result = newStmtList() + let parForSplitted = ident("weaveTask_DelayedParForSplit_") + var fnCall = newCall(bindSym"spawnDelayed") + + let pledge = dependsOn[0] + + if captured.len > 0: + let captured = if captured.len > 1: captured + else: captured[0] + + result.add quote do: + proc `parForSplitted`(`index`: SomeInteger, captures: `capturedTy`) {.nimcall, gcsafe.} = + let `captured` = captures + `body` + + for `index` in countup(`start`, `stop`-1, `stride`): + spawnDelayed((`pledge`, `index`), `parForSplitted`(`index`, `captured`)) + else: + result.add quote do: + proc `parForSplitted`(`index`: SomeInteger) {.nimcall, gcsafe.} = + `body` + + for `index` in countup(`start`, `stop`-1, `stride`): + spawnDelayed((`pledge`, `index`), `parForSplitted`(`index`)) + macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped = ## Parallel for loop ## Syntax: @@ -142,6 +177,18 @@ macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped result.addSanityChecks(capturedTy, CapturedTy) + # Pledges + # -------------------------------------------------------- + # TODO: support multiple pledges + let dependsOn = extractPledges(body) + # If the input dependencies depends on the loop index + # we need to eagerly split our lazily scheduled loop + # as iterations cannot be scheduled at the same type + # It also cannot be awaited with regular sync. + + if dependsOn.kind == nnkPar and dependsOn[1].eqIdent(idx): + return parallelForSplitted(idx, start, stop, stride, captured, capturedTy, dependsOn, body) + # Package the body in a proc # -------------------------------------------------------- let parForName = if withFuture: ident"weaveParallelForAwaitableSection" @@ -180,27 +227,28 @@ macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped let `env` = cast[ptr `CapturedTy`](param) `fnCall` else: - let dummyFut = ident"dummyFut" futTy = nnkBracketExpr.newTree( - bindSym"Flowvar", bindSym"Dummy" + bindSym"Flowvar", bindSym"bool" ) result.add quote do: proc `parForTask`(param: pointer) {.nimcall, gcsafe.} = let this = myTask() assert not isRootTask(this) - let `dummyFut` = cast[ptr `futTy`](param) + let lastLoopIter = cast[ptr FlowVar[bool]](param) when bool(`withArgs`): # This requires lazy futures to have a fixed max buffer size - let offset = cast[pointer](cast[ByteAddress](param) +% sizeof(`futTy`)) + let offset = cast[pointer](cast[ByteAddress](param) +% sizeof(Flowvar[bool])) let `env` = cast[ptr `CapturedTy`](offset) `fnCall` - readyWith(`dummyFut`[], Dummy()) + # The first loop iteration is the last to return in awaitable loops + readyWith(lastLoopiter[], this.isInitialIter) # Create the task # -------------------------------------------------------- result.addLoopTask( parForTask, start, stop, stride, captured, CapturedTy, + dependsOn, futureIdent = future, resultFutureType = futTy ) @@ -230,11 +278,16 @@ macro parallelFor*(loopParams: untyped, body: untyped): untyped = ## awaitable: myLoopHandle ## echo a + b + i ## - ## sync(myLoopHandle) + ## let lastIter = sync(myLoopHandle) ## ## In templates and generic procedures, you need to use "mixin myLoopHandle" ## or declare the awaitable handle before the loop to workaround Nim early symbol resolution + ## + ## Awaiting a for-loop returns true if it was the last loop iteration to await. + ## This is useful to have conditional execution (for example fulfilling a data dependency) + ## on nested loops. + # TODO - support pledge in reduction if (body[0].kind == nnkCall and body[0][0].eqIdent"reduce") or (body.len >= 2 and body[1].kind == nnkCall and body[1][0].eqIdent"reduce"): @@ -254,7 +307,7 @@ macro parallelForStrided*(loopParams: untyped, stride: Positive, body: untyped): # -------------------------------------------------------- when isMainModule: - import ./instrumentation/loggers, ./runtime, ./runtime_fsm + import ./instrumentation/loggers, ./runtime, ./runtime_fsm, os block: proc main() = @@ -336,7 +389,7 @@ when isMainModule: awaitable: innerJ M[i][j] = 1000 * i + 1000 * j - sync(innerJ) + discard sync(innerJ) # Check that the sync worked for j in 0 ..< 200: let Mij = M[i][j] @@ -350,3 +403,35 @@ when isMainModule: echo "-------------------------" main5() echo "-------------------------" + + block: + proc main6() = + init(Weave) + + let pA = newPledge(0, 10, 1) + let pB = newPledge(0, 10, 1) + + parallelFor i in 0 ..< 10: + captures: {pA} + sleep(i * 10) + pA.fulfill(i) + echo "Step A - stream ", i, " at ", i * 10, " ms" + + parallelFor i in 0 ..< 10: + dependsOn: (pA, i) + captures: {pB} + sleep(i * 10) + pB.fulfill(i) + echo "Step B - stream ", i, " at ", 2 * i * 10, " ms" + + parallelFor i in 0 ..< 10: + dependsOn: (pB, i) + sleep(i * 10) + echo "Step C - stream ", i, " at ", 3 * i * 10, " ms" + + exit(Weave) + + echo "Dataflow loop parallelism" + echo "-------------------------" + main6() + echo "-------------------------" diff --git a/weave/parallel_for_staged.nim b/weave/parallel_for_staged.nim index eaf2e9d..38fb9b2 100644 --- a/weave/parallel_for_staged.nim +++ b/weave/parallel_for_staged.nim @@ -87,12 +87,13 @@ template parallelStagedAwaitableWrapper( this.futures = cast[pointer](fvNode.next) LazyFV: - let dummyFV = cast[Flowvar[Dummy]](fvNode.lfv) + let dummyFV = cast[Flowvar[bool]](fvNode.lfv) EagerFV: - let dummyFV = cast[Flowvar[Dummy]](fvNode.chan) + let dummyFV = cast[Flowvar[bool]](fvNode.chan) debugSplit: log("Worker %2d: loop task 0x%.08x (iterations [%ld, %ld)) waiting for the remainder\n", myID(), this.fn, this.start, this.stop) - sync(dummyFV) + let isLastIter = sync(dummyFV) + ascertain: not isLastIter debugSplit: log("Worker %2d: loop task 0x%.08x (iterations [%ld, %ld)) complete\n", myID(), this.fn, this.start, this.stop) # The "sync" in the merge statement should have recycled the flowvar channel already @@ -181,27 +182,27 @@ macro parallelForStagedImpl*(loopParams: untyped, stride: int, body: untyped): u let `env` = cast[ptr `CapturedTy`](param) `fnCall` else: - let dummyFut = ident"dummyFut" futTy = nnkBracketExpr.newTree( - bindSym"Flowvar", bindSym"Dummy" + bindSym"Flowvar", bindSym"bool" ) result.add quote do: proc `parStagedTask`(param: pointer) {.nimcall, gcsafe.} = let this = myTask() assert not isRootTask(this) - let `dummyFut` = cast[ptr `futTy`](param) + let lastLoopIter = cast[ptr FlowVar[bool]](param) when bool(`withArgs`): # This requires lazy futures to have a fixed max buffer size - let offset = cast[pointer](cast[ByteAddress](param) +% sizeof(`futTy`)) + let offset = cast[pointer](cast[ByteAddress](param) +% sizeof(Flowvar[bool])) let `env` = cast[ptr `CapturedTy`](offset) `fnCall` - `dummyFut`[].readyWith(Dummy()) + readyWith(lastLoopiter[], this.isInitialIter) # Create the task # -------------------------------------------------------- result.addLoopTask( parStagedTask, start, stop, stride, captured, CapturedTy, + dependsOn = nil, # TODO futureIdent = future, resultFutureType = futTy ) @@ -260,7 +261,7 @@ when isMainModule: echo "Thread ", getThreadID(Weave), ": localsum = ", localSum res[].atomicInc(localSum) - sync(paraSum) + discard sync(paraSum) init(Weave) let sum1M = sumReduce(1000000) diff --git a/weave/parallel_macros.nim b/weave/parallel_macros.nim index 9e8c009..5c7cbaa 100644 --- a/weave/parallel_macros.nim +++ b/weave/parallel_macros.nim @@ -11,7 +11,8 @@ import # Internal ./datatypes/[sync_types, flowvars], ./contexts, ./instrumentation/profilers, - ./scheduler + ./scheduler, + ./channels/pledges # Parallel for utilities # ---------------------------------------------------------- @@ -128,13 +129,13 @@ proc extractCaptures*(body: NimNode, c: int): tuple[captured, capturedTy: NimNod body[c] = nnkDiscardStmt.newTree(body[c].toStrLit) proc extractFutureAndCaptures*(body: NimNode): tuple[future, captured, capturedTy: NimNode] = - ## Extract the result future/flowvar and the vaptured variables if any + ## Extract the result future/flowvar and the captured variables if any ## out of a parallelFor / parallelForStrided / parallelForStaged / parallelForStagedStrided ## Returns a future, the captured variable and the captured type template findCapturesAwaitable(idx: int) = if body[idx][0].eqIdent"captures": assert result.captured.isNil and result.capturedTy.isNil, "The captured section can only be set once for a loop." - (result.captured, result.capturedTy) = extractCaptures(body, 0) + (result.captured, result.capturedTy) = extractCaptures(body, idx) elif body[idx][0].eqIdent"awaitable": body[idx][1].expectKind(nnkStmtList) body[idx][1][0].expectKind(nnkIdent) @@ -143,19 +144,31 @@ proc extractFutureAndCaptures*(body: NimNode): tuple[future, captured, capturedT # Remove the awaitable section body[idx] = nnkDiscardStmt.newTree(body[idx].toStrLit) - if body[0].kind == nnkCall: - findCapturesAwaitable(0) - if body.len > 1 and body[1].kind == nnkCall: - findCapturesAwaitable(1) + for i in 0 ..< body.len-1: + if body[i].kind == nnkCall: + findCapturesAwaitable(i) + +proc extractPledges*(body: NimNode): NimNode = + ## Extract the dependencies in/out (pledges) if any + template findPledges(idx: int) = + if body[idx][0].eqIdent"dependsOn": + assert result.isNil, "The dependsOn section can only be set once for a loop." + result = body[idx][1][0] + # Remove the dependsOn section + body[idx] = nnkDiscardStmt.newTree(body[idx].toStrLit) + + for i in 0 ..< body.len-1: + if body[i].kind == nnkCall: + findPledges(i) proc addSanityChecks*(statement, capturedTypes, capturedTypesSym: NimNode) = if capturedTypes.len > 0: statement.add quote do: static: - doAssert supportsCopyMem(`capturedTypesSym`), "\n\n parallelFor" & - " has arguments managed by GC (ref/seq/strings),\n" & - " they cannot be distributed across threads.\n" & - " Argument types: " & $`capturedTypes` & "\n\n" + # doAssert supportsCopyMem(`capturedTypesSym`), "\n\n parallelFor" & + # " has arguments managed by GC (ref/seq/strings),\n" & + # " they cannot be distributed across threads.\n" & + # " Argument types: " & $`capturedTypes` & "\n\n" doAssert sizeof(`capturedTypesSym`) <= TaskDataSize, "\n\n parallelFor" & " has arguments that do not fit in the parallel tasks data buffer.\n" & @@ -229,6 +242,7 @@ proc addLoopTask*( statement, asyncFn, start, stop, stride, capturedVars, CapturedTySym: NimNode, + dependsOn: NimNode, futureIdent, resultFutureType: NimNode ) = ## Add a loop task @@ -248,6 +262,30 @@ proc addLoopTask*( let futureIdent = if hasFuture: futureIdent else: ident("dummy") + # Dependencies + # --------------------------------------------------- + var scheduleBlock: NimNode + let task = ident"task" + if dependsOn.isNil: + scheduleBlock = newCall(bindSym"schedule", task) + elif dependsOn.kind == nnkIdent: + scheduleBlock = quote do: + if not delayedUntil(`task`, `dependsOn`, myMemPool()): + schedule(`task`) + else: + let (pledge, pledgeIndex) = (dependsOn[0], dependsOn[1]) + if pledgeIndex.kind == nnkIntLit and pledgeIndex.intVal == NoIter: + scheduleBlock = quote do: + if not delayedUntil(`task`, `pledge`, myMemPool()): + schedule(`task`) + else: + # This is a dependency on a loop index from ANOTHER loop + # not the loop that is currently scheduled. + scheduleBlock = quote do: + if not delayedUntil(`task`, `pledge`, int32(`pledgeIndex`), myMemPool()): + schedule(`task`) + + # --------------------------------------------------- if hasFuture: statement.add quote do: when not declared(`futureIdent`): @@ -260,21 +298,24 @@ proc addLoopTask*( # TODO profiling templates visibility issue timer_start(timer_enq_deq_task) block enq_deq_task: - let task = newTaskFromCache() - task.parent = myTask() - task.fn = `asyncFn` - task.isLoop = true - task.start = `start` - task.cur = `start` - task.stop = `stop` - task.stride = `stride` - task.hasFuture = true - task.futureSize = uint8(sizeof(`resultFutureType`.T)) + let `task` = newTaskFromCache() + `task`.parent = myTask() + `task`.fn = `asyncFn` + + `task`.start = `start` + `task`.cur = `start` + `task`.stop = `stop` + `task`.stride = `stride` + + `task`.futureSize = uint8(sizeof(`resultFutureType`.T)) + `task`.hasFuture = true + `task`.isLoop = true + `task`.isInitialIter = true when bool(`withArgs`): - cast[ptr (`resultFutureType`, `CapturedTySym`)](task.data.addr)[] = (`futureIdent`, `capturedVars`) + cast[ptr (`resultFutureType`, `CapturedTySym`)](`task`.data.addr)[] = (`futureIdent`, `capturedVars`) else: - cast[ptr `resultFutureType`](task.data.addr)[] = `futureIdent` - schedule(task) + cast[ptr `resultFutureType`](`task`.data.addr)[] = `futureIdent` + `scheduleBlock` when defined(WV_profile): timer_stop(timer_enq_deq_task) else: @@ -286,17 +327,17 @@ proc addLoopTask*( # TODO profiling templates visibility issue timer_start(timer_enq_deq_task) block enq_deq_task: - let task = newTaskFromCache() - task.parent = myTask() - task.fn = `asyncFn` - task.isLoop = true - task.start = `start` - task.cur = `start` - task.stop = `stop` - task.stride = `stride` + let `task` = newTaskFromCache() + `task`.parent = myTask() + `task`.fn = `asyncFn` + `task`.isLoop = true + `task`.start = `start` + `task`.cur = `start` + `task`.stop = `stop` + `task`.stride = `stride` when bool(`withArgs`): - cast[ptr `CapturedTySym`](task.data.addr)[] = `capturedVars` - schedule(task) + cast[ptr `CapturedTySym`](`task`.data.addr)[] = `capturedVars` + `scheduleBlock` when defined(WV_profile): timer_stop(timer_enq_deq_task) diff --git a/weave/parallel_reduce.nim b/weave/parallel_reduce.nim index 754e36b..2d1ace9 100644 --- a/weave/parallel_reduce.nim +++ b/weave/parallel_reduce.nim @@ -223,6 +223,7 @@ macro parallelReduceImpl*(loopParams: untyped, stride: int, body: untyped): unty # -------------------------------------------------------- result.addLoopTask( parReduceTask, start, stop, stride, captured, CapturedTy, + dependsOn = nil, # TODO finalAccum, FutTy ) diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index 509dec3..51dd75a 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -14,13 +14,14 @@ import # Internal ./scheduler, ./contexts, ./await_fsm, ./datatypes/[flowvars, sync_types], - ./instrumentation/[contracts, profilers] + ./instrumentation/[contracts, profilers], + ./channels/pledges # workaround visibility issues export forceFuture export profilers, contexts -macro spawn*(funcCall: typed): untyped = +proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = # We take typed argument so that overloading resolution # is already done and arguments are semchecked funcCall.expectKind(nnkCall) @@ -47,10 +48,10 @@ macro spawn*(funcCall: typed): untyped = if withArgs: result.add quote do: static: - assert supportsCopyMem(`argsTy`), "\n\n" & `fnName` & - " has arguments managed by GC (ref/seq/strings),\n" & - " they cannot be distributed across threads.\n" & - " Argument types: " & $`argsTy` & "\n\n" + # assert supportsCopyMem(`argsTy`), "\n\n" & `fnName` & + # " has arguments managed by GC (ref/seq/strings),\n" & + # " they cannot be distributed across threads.\n" & + # " Argument types: " & $`argsTy` & "\n\n" assert sizeof(`argsTy`) <= TaskDataSize, "\n\n" & `fnName` & " has arguments that do not fit in the async data buffer.\n" & @@ -63,6 +64,32 @@ macro spawn*(funcCall: typed): untyped = var fnCall = newCall(fn) let data = ident("data") # typed pointer to data + # Schedule immediately or delay on dependencies + var scheduleBlock: NimNode + let task = ident"task" + if pledges.isNil: + scheduleBlock = newCall(bindSym"schedule", task) + elif pledges.len == 1: + let pledgeDesc = pledges[0] + if pledgeDesc.kind in {nnkIdent, nnkSym}: + scheduleBlock = quote do: + if not delayedUntil(`task`, `pledgeDesc`, myMemPool()): + schedule(`task`) + else: + pledgeDesc.expectKind({nnkPar, nnkTupleConstr}) + let pledge = pledgeDesc[0] + let pledgeIndex = pledgeDesc[1] + scheduleBlock = quote do: + if not delayedUntil(`task`, `pledge`, int32(`pledgeIndex`), myMemPool()): + schedule(`task`) + else: + let delayedMulti = getAst(delayedUntilMulti( + task, newCall(bindSym"myMemPool"), pledges) + ) + scheduleBlock = quote do: + if not `delayedMulti`: + schedule(`task`) + if not needFuture: # TODO: allow awaiting on a Flowvar[void] if funcCall.len == 2: # With only 1 arg, the tuple syntax doesn't construct a tuple @@ -89,12 +116,12 @@ macro spawn*(funcCall: typed): untyped = # TODO profiling templates visibility issue timer_start(timer_enq_deq_task) block enq_deq_task: - let task = newTaskFromCache() - task.parent = myTask() - task.fn = `async_fn` + let `task` = newTaskFromCache() + `task`.parent = myTask() + `task`.fn = `async_fn` when bool(`withArgs`): - cast[ptr `argsTy`](task.data.addr)[] = `args` - schedule(task) + cast[ptr `argsTy`](`task`.data.addr)[] = `args` + `scheduleBlock` when defined(WV_profile): timer_stop(timer_enq_deq_task) @@ -136,14 +163,14 @@ macro spawn*(funcCall: typed): untyped = # TODO profiling templates visibility issue timer_start(timer_enq_deq_task) block enq_deq_task: - let task = newTaskFromCache() - task.parent = myTask() - task.fn = `async_fn` - task.has_future = true - task.futureSize = uint8(sizeof(`retType`)) + let `task` = newTaskFromCache() + `task`.parent = myTask() + `task`.fn = `async_fn` + `task`.has_future = true + `task`.futureSize = uint8(sizeof(`retType`)) let `fut` = newFlowvar(myMemPool(), `freshIdent`) - cast[ptr `futArgsTy`](task.data.addr)[] = `futArgs` - schedule(task) + cast[ptr `futArgsTy`](`task`.data.addr)[] = `futArgs` + `scheduleBlock` when defined(WV_profile): timer_stop(timer_enq_deq_task) # Return the future @@ -153,11 +180,29 @@ macro spawn*(funcCall: typed): untyped = result = nnkBlockStmt.newTree(newEmptyNode(), result) # echo result.toStrLit +macro spawn*(fnCall: typed): untyped = + ## Spawns the input function call asynchronously, potentially on another thread of execution. + ## If the function calls returns a result, spawn will wrap it in a Flowvar. + ## You can use sync to block the current thread and extract the asynchronous result from the flowvar. + ## Spawn returns immediately. + result = spawnImpl(nil, fnCall) + +macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped = + ## Spawns the input function call asynchronously, potentially on another thread of execution. + ## The function call will only be scheduled when the pledge is fulfilled. + ## + ## If the function calls returns a result, spawn will wrap it in a Flowvar. + ## You can use sync to block the current thread and extract the asynchronous result from the flowvar. + ## spawnDelayed returns immediately. + ## + ## Ensure that before syncing on the flowvar of a delayed spawn, its pledge can be fulfilled or you will deadlock. + result = spawnImpl(pledges, fnCall) + # Sanity checks # -------------------------------------------------------- when isMainModule: - import ./runtime, ./runtime_fsm + import ./runtime, ./runtime_fsm, os block: # Async without result @@ -197,3 +242,67 @@ when isMainModule: echo f main2() + + block: # Delayed computation + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2() = + echo "Display B2, exit stream" + + proc echoC1() = + echo "Display C1, exit stream" + + proc main() = + echo "Sanity check 3: Dataflow parallelism" + init(Weave) + let pA = newPledge() + let pB1 = newPledge() + spawnDelayed pB1, echoC1() + spawnDelayed pA, echoB2() + spawnDelayed pA, echoB1(pB1) + spawn echoA(pA) + exit(Weave) + + main() + + block: # Delayed computation with multiple dependencies + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2(pB2: Pledge) = + echo "Display B2, no sleep" + pB2.fulfill() + + proc echoC12() = + echo "Display C12, exit stream" + + proc main() = + echo "Sanity check 4: Dataflow parallelism with multiple dependencies" + init(Weave) + let pA = newPledge() + let pB1 = newPledge() + let pB2 = newPledge() + spawnDelayed pB1, pB2, echoC12() + spawnDelayed pA, echoB2(pB2) + spawnDelayed pA, echoB1(pB1) + spawn echoA(pA) + exit(Weave) + + main() diff --git a/weave/victims.nim b/weave/victims.nim index ceadbe4..cc13a28 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -249,6 +249,7 @@ proc splitAndSend*(task: Task, req: sink StealRequest, workSharing: bool) = upperSplit.start = split upperSplit.cur = split upperSplit.stop = task.stop + upperSplit.isInitialIter = false # Current task continues with lower half task.stop = split