Skip to content

Commit

Permalink
[WIP] Dataflow graph parallelism v2 (#94)
Browse files Browse the repository at this point in the history
* Stash pledges, blocked by nim-lang/Nim#13048

* Workaround nim-lang/Nim#13048

* Pass the pledges implementation tests

* Prepare for supporting mixed single and iteration pledges

* move pledge access counter to the impl. + Move file to channels subfolder

* add iteration pledges

* Sanity checks for loop pledges

* create fulfill public API

* Prepare parallel_tasks for supporting pledges

* Add dataflow graph parallelism 💥 🎆

* Add daflow support to parallel for loops

* Fix atomics + assert templates visibility on some platforms

* remove anti overwrite check (not true as compilers don't always zero-init)

* Please the template early symbol resolution god

* Nestable GEMM reaching 2.87TFlops for a speedup of 17.96 on 18 cores machines

* dependent loops

* Prepare for multiple pledge dependencies support

* Support joining dataflow graph

* bindSym + augment TaskDataSize

* Awaitable for loop now returns true if it was the last iteration to await

* Fix + test againt vendor BLAS, still one non-nestable barrier :/ (but consistently 17.5x speedup)

* typos

* Allow failure on C++
  • Loading branch information
mratsim authored Jan 7, 2020
1 parent 5d90172 commit 85b90b3
Show file tree
Hide file tree
Showing 23 changed files with 1,149 additions and 149 deletions.
22 changes: 12 additions & 10 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ strategy:
matrix:
# Nim requires enforcing ARCH="x86" and ucpu
# for 32-bit targets as it seems like Azure machines are 64-bit
# TODO: C++ is allowed to fail
# Nim doesn't compile pledges properly in C++ mode
Windows_devel_32bit:
VM: 'windows-latest'
ARCH: x86
Expand All @@ -15,21 +17,21 @@ strategy:
PLATFORM: x64
CHANNEL: devel
WEAVE_TEST_LANG: c
Windows_cpp_devel_64bit:
VM: 'windows-latest'
PLATFORM: x64
CHANNEL: devel
WEAVE_TEST_LANG: cpp
# Windows_cpp_devel_64bit:
# VM: 'windows-latest'
# PLATFORM: x64
# CHANNEL: devel
# WEAVE_TEST_LANG: cpp
Linux_devel_64bit:
VM: 'ubuntu-16.04'
PLATFORM: x64
CHANNEL: devel
WEAVE_TEST_LANG: c
Linux_cpp_devel_64bit:
VM: 'ubuntu-16.04'
PLATFORM: x64
CHANNEL: devel
WEAVE_TEST_LANG: cpp
# Linux_cpp_devel_64bit:
# VM: 'ubuntu-16.04'
# PLATFORM: x64
# CHANNEL: devel
# WEAVE_TEST_LANG: cpp
Linux_devel_32bit:
VM: 'ubuntu-16.04'
PLATFORM: x86
Expand Down
47 changes: 47 additions & 0 deletions benchmarks/matmul_gemm_blas/all_gemm.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
./laser_omp_gemm,
./mkl_gemm, # OpenBLAS nd MKL cannot be linked at the same time
./weave_gemm,
./gemm_bench_common,
./gemm_bench_config,
../../weave

# This aggregate all benchmarks in one
# Warning: Bench results are not reliable, it seems like threads/calls
# interfere with each other, even when only calling OpenMP-based code.

when isMainModule:
import std/[random, sequtils]

randomize(42) # For reproducibility

let a = newSeqWith(M*K, float32 rand(-0.1..0.1))
let b = newSeqWith(K*N, float32 rand(-0.1..0.1))

warmup()
echo "Warning: The aggregate bench is unreliable, the libraries interfere with each other."

block:
reportConfig("Intel MKL + Laser OMP + Weave", float32, (M, K), (K, N))
let mkl = benchMKL(a, b, (M,K), (K,N), NbSamples)

# let laser = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples)

init(Weave)
let weave = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples)
exit(Weave)

let weaveError = mean_relative_error(weave, mkl)
echo "Mean Relative Error of Weave vs reference: ", weaveError
doAssert weaveError <= 1e-5'f32, $weaveError

# let laserError = mean_relative_error(laser, mkl)
# echo "Mean Relative Error of Laser vs reference: ", laserError
# doAssert laserError <= 1e-5'f32, $laserError
6 changes: 6 additions & 0 deletions benchmarks/matmul_gemm_blas/all_gemm.nim.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
clibdir:"/opt/intel/mkl/lib/intel64"
passl:"/opt/intel/mkl/lib/intel64/libmkl_intel_lp64.a"
passl:"-lmkl_core"
passl:"-lmkl_intel_thread"
passl:"-liomp5"
dynlibOverride:"mkl_intel_lp64"
36 changes: 34 additions & 2 deletions benchmarks/matmul_gemm_blas/gemm_bench_common.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Apache v2 License
# Mamy Ratsimbazafy
# Laser
# Copyright (c) 2018-Present Mamy André-Ratsimbazafy
# Distributed under the Apache v2 License (license terms are at http://www.apache.org/licenses/LICENSE-2.0).
# This file may not be copied, modified, or distributed except according to those terms.

import std/[sequtils, times, monotimes, stats, strformat, random]

Expand Down Expand Up @@ -87,3 +89,33 @@ template bench*(name: string, req_ops: int, initialisation, body: untyped) {.dir
let global_stop = getMonoTime()
let globalElapsed = inMilliseconds(global_stop - global_start)
printStats(name, result)

proc relative_error*[T: SomeFloat](y, y_true: T): T {.inline.} =
## Relative error, |y_true - y|/max(|y_true|, |y|)
## Normally the relative error is defined as |y_true - y| / |y_true|,
## but here max is used to make it symmetric and to prevent dividing by zero,
## guaranteed to return zero in the case when both values are zero.
let denom = max(abs(y_true), abs(y))
if denom == 0.T:
return 0.T
result = abs(y_true - y) / denom

proc absolute_error*[T: SomeFloat](y, y_true: T): T {.inline.} =
## Absolute error for a single value, |y_true - y|
result = abs(y_true - y)

proc mean_relative_error*[T: SomeFloat](y, y_true: seq[T]): T {.inline.} =
doAssert y.len == y_true.len

result = 0.T
for i in 0 ..< y.len:
result += relative_error(y[i], y_true[i])
result = result / y.len.T

proc mean_absolute_error*[T: SomeFloat](y, y_true: seq[T]): T {.inline.} =
doAssert y.len == y_true.len

result = 0.T
for i in 0 ..< y.len:
result += absolute_error(y[i], y_true[i])
result = result / y.len.T
16 changes: 10 additions & 6 deletions benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proc pack_A_mc_kc*[T; ukernel: static MicroKernel](
proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
packedB: ptr UncheckedArray[T],
kc, nc: int,
B: MatrixView[T]) =
B: MatrixView[T], kcTileReady: Pledge) =
## Packs panel [kc, nc] for ~B (half-L1 cache)
## Pads if needed
##
Expand All @@ -76,10 +76,10 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
let unroll_stop = nc.round_step_down(NR)

# 1. Pack n matrices of size kc*nr, n = nc/nr
parallelForStrided j in 0 ..< unroll_stop, stride = NR:
captures: {kc, buffer, B}
parallelFor k in 0 ..< kc:
captures: {j, kc, buffer, B}
parallelFor k in 0 ..< kc:
awaitable: kcLoop
captures: {kc, buffer, B, unroll_stop}
for j in countup(0, unroll_stop-1, NR):
for jj in 0 ..< NR:
buffer[j*kc + k*NR + jj] = B[k, j+jj]

Expand All @@ -93,4 +93,8 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
for j in remainder ..< NR: # Pad with 0 if packing over the edge
offBuf[k*NR + j] = 0.T

syncRoot(Weave)
# Note: the tail is processed in the calling thread
# so waiting there guarantees proper data dependencies
# provided the "k" loop is not nested (i.e. does real work instead of enqueueing tasks)
discard sync(kcLoop)
kcTileReady.fulfill()
38 changes: 18 additions & 20 deletions benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,18 @@ proc gemm_impl[T; ukernel: static MicroKernel](
prefetch(tiles.b, Write, LowTemporalLocality)
let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc]

let kcncTileReady = newPledge()
let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc]
pack_B_kc_nc[T, ukernel](tiles.b, kc, nc, kcncB) # PackB panel [kc, nc] (nc is large or unknown)
spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown)
tiles.b, kc, nc, kcncB, kcncTileReady)

# First time writing to C, we scale it, otherwise accumulate
let beta = if pc == 0: beta else: 1.T

# ####################################
# 3. for ic = 0,...,m−1 in steps of mc
parallelFor icb in 0 ..< tiles.ic_num_tasks:
captures: {pc, tiles, nc, kc, alpha, beta, vA, vC, M}
captures: {kcncTileReady, pc, tiles, nc, kc, alpha, beta, vA, vC, M}

let packA = tiles.a + icb * tiles.upanelA_size
prefetch(packA, Write, LowTemporalLocality)
Expand All @@ -174,12 +176,16 @@ proc gemm_impl[T; ukernel: static MicroKernel](
let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc]
pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc]

gebp_mkernel[T, ukernel]( # GEBP macrokernel:
spawnDelayed(
kcncTileReady,
gebp_mkernel[T, ukernel]( # GEBP macrokernel:
mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] =
alpha, packA, tiles.b, # αA[ic:ic+mc, pc:pc+kc] * B[pc:pc+kc, jc:jc+nc] +
beta, vC.stride(ic, 0) # βC[ic:ic+mc, jc:jc+nc]
)
syncRoot(Weave)
)
# Only trigger the next phase when the last one is finished
syncRoot(Weave) # TODO: this is the last non-nestable barrier

# ############################################################
#
Expand Down Expand Up @@ -282,10 +288,9 @@ when isMainModule:
b[0][0].unsafeAddr, 2, 1,
0.0, res_ab[0][0].addr, 2, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand All @@ -309,10 +314,9 @@ when isMainModule:
b[0][0].unsafeAddr, 2, 1,
0.0, res_ab[0][0].addr, 2, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand All @@ -334,10 +338,9 @@ when isMainModule:
b[0][0].unsafeAddr, 2, 1,
0.0, res_ab[0][0].addr, 2, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand All @@ -360,10 +363,9 @@ when isMainModule:
b[0][0].unsafeAddr, 4, 1,
0, res_ab[0][0].addr, 4, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand Down Expand Up @@ -393,10 +395,9 @@ when isMainModule:
b[0][0].unsafeAddr, 4, 1,
0, res_ab[0][0].addr, 4, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand Down Expand Up @@ -424,10 +425,9 @@ when isMainModule:
b[0][0].unsafeAddr, 2, 1,
0, res_ab[0][0].addr, 2, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand Down Expand Up @@ -461,10 +461,9 @@ when isMainModule:
b[0][0].unsafeAddr, 8, 1,
0, res_ab[0][0].addr, 8, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand Down Expand Up @@ -507,10 +506,9 @@ when isMainModule:
b[0][0].unsafeAddr, 8, 1,
0, res_ab[0][0].addr, 8, 1
)

syncRoot(Weave)
# echo "expected: ", ab
# echo "result: ", res_ab

doAssert res_ab == ab, $res_ab
echo "SUCCESS\n"

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/matmul_gemm_blas/laser_omp_gemm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ when not defined(vcc):
else:
{.pragma: restrict, codegenDecl: "$# __restrict $#".}

proc benchLaserGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
proc benchLaserGEMM*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
let req_ops = gemm_required_ops(ashape, bshape)
let out_shape = gemm_out_shape(ashape, bshape)
let out_size = out_shape.M * out_shape.N
Expand Down Expand Up @@ -48,4 +48,4 @@ when isMainModule:
let a = newSeqWith(M*K, float32 rand(-0.1..0.1))
let b = newSeqWith(K*N, float32 rand(-0.1..0.1))

let mkl = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples)
let laser = benchLaserGEMM(a, b, (M,K), (K,N), NbSamples)
2 changes: 1 addition & 1 deletion benchmarks/matmul_gemm_blas/mkl_gemm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ proc gemm*(ORDER: OrderType, TRANSA, TRANSB: TransposeType, M, N, K: int, ALPHA:
A: ptr float64, LDA: int, B: ptr float64, LDB: int, BETA: float64, C: ptr float64, LDC: int)
{. dynlib: blas, importc: "cblas_dgemm" .}

proc benchMKL(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
proc benchMKL*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
let req_ops = gemm_required_ops(ashape, bshape)
let out_shape = gemm_out_shape(ashape, bshape)
let out_size = out_shape.M * out_shape.N
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/matmul_gemm_blas/openblas_gemm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ proc gemm*(ORDER: OrderType, TRANSA, TRANSB: TransposeType, M, N, K: int, ALPHA:
A: ptr float64, LDA: int, B: ptr float64, LDB: int, BETA: float64, C: ptr float64, LDC: int)
{. dynlib: blas, importc: "cblas_dgemm" .}

proc benchOpenBLAS(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
proc benchOpenBLAS*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
let req_ops = gemm_required_ops(ashape, bshape)
let out_shape = gemm_out_shape(ashape, bshape)
let out_size = out_shape.M * out_shape.N
Expand Down Expand Up @@ -53,4 +53,4 @@ when isMainModule:
let a = newSeqWith(M*K, float32 rand(-0.1..0.1))
let b = newSeqWith(K*N, float32 rand(-0.1..0.1))

let mkl = benchOpenBLAS(a, b, (M,K), (K,N), NbSamples)
let openblas = benchOpenBLAS(a, b, (M,K), (K,N), NbSamples)
11 changes: 6 additions & 5 deletions benchmarks/matmul_gemm_blas/weave_gemm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ when not compileOption("threads"):

import
./gemm_bench_common, ./gemm_bench_config,
./gemm_pure_nim/gemm_weave
./gemm_pure_nim/gemm_weave,
../../weave

when not defined(vcc):
{.pragma: restrict, codegenDecl: "$# __restrict__ $#".}
else:
{.pragma: restrict, codegenDecl: "$# __restrict $#".}

proc benchWeaveGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
proc benchWeaveGEMM*(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples: int): seq[float32] =
let req_ops = gemm_required_ops(ashape, bshape)
let out_shape = gemm_out_shape(ashape, bshape)
let out_size = out_shape.M * out_shape.N
Expand All @@ -35,13 +36,13 @@ proc benchWeaveGEMM(a, b: seq[float32], ashape, bshape: MatrixShape, nb_samples:
b_ptr, N, 1,
0'f32, c_ptr, N, 1
)
syncRoot(Weave) # Weave gemm is async and returns immediately

# Bench
when isMainModule:
import std/[random, sequtils]
import ../../weave

randomize(42) # FOr reproducibility
randomize(42) # For reproducibility
# warmup()
reportConfig("Weave (Pure Nim)", float32, (M, K), (K, N))

Expand All @@ -50,5 +51,5 @@ when isMainModule:
let b = newSeqWith(K*N, float32 rand(-0.1..0.1))

init(Weave)
let mkl = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples)
let weave = benchWeaveGEMM(a, b, (M,K), (K,N), NbSamples)
exit(Weave)
Loading

0 comments on commit 85b90b3

Please sign in to comment.