Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Dataflow graph parallelism #92

Closed
wants to merge 9 commits into from
Closed

Conversation

mratsim
Copy link
Owner

@mratsim mratsim commented Jan 4, 2020

Heavy work-in-progress, ideas welcome.

This is research work on implementing data flow parallelism (also called stream parallelism, pipeline parallelism, data-driven task parallelism).

Needs, research, other runtime approaches are detailed starting from the following comment: #31 (comment).

The practical direct goal is to be able to call Weave Matrix Multiplication from a parallel loop as in many cases we might have a batch of small matrices, say 64 images of size 224x224 (the base image size in ImageNet dataset) and it would be more efficient to find parallelism at the batch level instead of intra-level. This is currently impossible in Weave (or OpenMP for that matter).

The reason why is that the current implementation requires a barrier after the outer parallel for that represents data dependencies:

parallelFor icb in 0 ..< tiles.ic_num_tasks:
captures: {pc, tiles, nc, kc, alpha, beta, vA, vC, M}
let packA = tiles.a + icb * tiles.upanelA_size
prefetch(packA, Write, LowTemporalLocality)
let ic = icb * tiles.mc
let mc = min(M-ic, tiles.mc) # C[ic:ic+mc, jc:jc+nc]
let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc]
pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc]
gebp_mkernel[T, ukernel]( # GEBP macrokernel:
mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] =
alpha, packA, tiles.b, # αA[ic:ic+mc, pc:pc+kc] * B[pc:pc+kc, jc:jc+nc] +
beta, vC.stride(ic, 0) # βC[ic:ic+mc, jc:jc+nc]
)
syncRoot(Weave)

But that barrier can only be called from the root task/main thread preventing nesting in another extra parallel region.
Furthermore, precise barriers are fundamentally incompatible with work-stealing because there is no way to know which threads may execute a code path so some may never hit the barrier and we will be deadlocked.

Instead we can properly tell the runtime the actual dependencies: what data is needed to continue computation.

Note that this is pretty much uncharted territory at the moment and I'm already worried about the overhead and it will require several refinements.

@mratsim
Copy link
Owner Author

mratsim commented Jan 4, 2020

A higher-level analysis of the current Promise design and the design space.

Storage at node-level

First, let's get how promises and their associated tasks are stored out of the way

Currently we have a poor's man minimum working example:

type
Worker* = object
## Distributed binary tree
##
## Each worker/thread is a node ID that will determine
## its parent and the children it oversees if any
##
## a node N will have as a left child node N*2+1
## and as a right child node N*2+2
##
## This tree tracks if workers backed off from stealing
## if they didn't find any task and now take a passive role:
## waiting for tasks to be shared instead of actively stealing.
##
## This is a "thread-local" structure, updates are received asynchronously
## via the StealRequest channel.
# ⚠️ : We use 0-based indexing if you are familiar with binary tree
# indexed from 1, this is the correspondance table
# root at 0 root at 1
# Left child ix*2 + 1 ix*2
# Right child ix*2 + 2 ix*2 + 1
# Parent (ix-1)/2 ix/2
ID*: WorkerID
left*: WorkerID
right*: WorkerID
parent*: WorkerID
workSharingRequests*: BoundedQueue[2, StealRequest]
deque*: PrellDeque[Task]
# ⚠️ - without gc:destructors / gc:arc / newruntime, the destructors
# are not properly triggered, and those are refcounted types.
# The promises will also clog the memory pool.
# TODO: tasks can depend on multiple promises and a mix of normal and loop tasks.
# TODO: We assume that a worker holds the whole set of loop tasks
promises*: seq[Promise]
dependentTasks*: seq[Task]
loopPromises*: seq[ConsumerLoopPromises]
dependentLoopTasks*: seq[seq[Task]]
currentTask*: Task
leftIsWaiting*: bool
rightIsWaiting*: bool
isWaiting*: bool
.
This doesn't support multiple dependencies per delayed tasks or a mix between loop and normal dependencies.

Instead we should at least have seq[tuple[task: Task, deps: seq[Deps]]).
Deps need to be a variant that erase the Promise/ConsumerLoopPromise type difference.
As optimization we could either:

  • use a SmallSeq instead of a seq, with the first few items always stack-allocated
  • use a ptr UncheckedArray to a memory-pooled block (256 bytes to describe dependencies ought to be enough)
  • use both

On the communication patterns

Current design

At a higher level what is implemented has the following characteristics:

  • Producers will push to a SPMC broadcast channel (the promise) when data is ready
  • Only interested consumers subscribe to that channel
  • The last consumer/producer release the promise reserved memory.

==> In distributed speak we have a pubsub system

On the consumer side:

  • They have a data structure of pending tasks associated with their dependencies.
  • They poll all the pending tasks to see if dependencies were resolved.
  • Loop dependencies require a producer tree that is mirrored on each consumer with a dispatched tree.

==> This is similar to a pull from an intermediate message broker on the consumer side.

Alternative design

Alternatively, we could do something like this:

  • Each workers has an unbounded MPSC channel for FulfilledPromise
  • Producers push a fulfilled promise to the root worker
  • The root worker will propagate the fulfilled promise to its direct children
  • Children propagate to their own children and so on.
  • Workers also regularly consume the incoming fulfilled promises messages:
    They discard it if irrelevant or schedule(Task) if relevant.

We have gossipsub pattern

The interesting parts are:

  • we avoid polling all the delayed tasks, the worker can have a map (dependencies -> Task) to do that in O(1)
  • we avoid memory management with atomic refcounting
  • we don't need a "ConsumerLoopPromise" data structure, we only need a hashing function that can hash (Promise) and (Promise+LoopIndex)

The potentially dead-on-arrival parts are:

  • Assume we have - workers and a loop of 100 iterations, in total Worker 0 will receive 100 fulfilled promise messages, will relay 200, then 400
    image
    So we get a lot more messaging overhead in the system.

Comparison

The main theoretical overhead fight becomes

PubSub GossipSub
Only required messages published Gossip, probably lots of published messages + latency
Tasks/Dependencies array scan MPSC channel polling
Mirrored LoopPromises with (2 * n log(n)) cost unified in MPSC channel polling + hash
Atomic refcounting Hash (stack)
heap-allocated promises Hash (uint64)
Task Promise + LoopTask Promise Hash (uint64)

Unadressed part

In both cases, we have to prevent a thread from discarding a promise and then requiring after the fact.
Example to be cooked up.

@mratsim
Copy link
Owner Author

mratsim commented Jan 4, 2020

Let's address the unaddressed part

Example 1: a simple producer task and a consumer task pseudocode

proc foo(p: Promise, A, B: Matrix) =
  A[0, 0] = 123
  p.fulfill()

proc bar(A: Matrix, i, j: int) =
  assert A[0, 0] = 123 # This would fail if bar is executed before foo
  A[i, j] = A[j, i]

proc main(A, B: Matrix) =

  var p = newPromise()

  spawn foo(p, A, B)
  # foo() may be finished before the next line
  spawnDelayed p, bar(A, 1, 2)
  # main can exit immediately
  # either p is a heap reference
  # or it is a unique ID captured by value

init(Weave)
main()
exit(Weave)

In this example, the main thread schedules a task, then a delayed task, and exits immediately.

In the pubsub case

For the Promise to be valid it needs to be heap-allocated for a pubsub implementation as channels are on a as-needed basis.

As soon as it's passed via spawn, the refcount is incremented so even if the main thread exit we are OK.

In the gossipsub case

A promise is just a hash, and can be captured by value so we don't need memory management here, the main thread can exit the proc.

However, what can happen here with :

  1. Thread A receives the foo() task, finishes and publish the fulfill message
  2. Thread B receives the promise fulfill message, doesn't depend on it, discards it
  3. Thread B receives the delayed task
  4. Thread B awaits for a fulfill message that will never happen again and the runtime deadlocks.

Can we do better?
If we prevent a thread from distributing delayed tasks we can ensure that there will be no missed messages if consumers are declared before producers. This is because the thread holding the delayed task is already subscribed to incoming fulfilled message before even spawning the producer.

proc main(A, B: Matrix) =

  var p = newPromise()

  # foo() cannot be finished before the next line
  spawnDelayed p, bar(A, 1, 2)
  spawn foo(p, A, B)
  # main can exit immediately
  # either p is a heap reference
  # or it is a unique ID captured by value

This however still seems quite brittle: users won't get an error message "please declare consumer before producers", it leaks abstraction constraints, and it may not handle nested producer-consumer relationships (or at least it's not obvious to me)

@mratsim
Copy link
Owner Author

mratsim commented Jan 4, 2020

Some ideas on reliable request-reply from ZeroMQ: http://zguide.zeromq.org/page%3aall#reliable-request-reply

@mratsim
Copy link
Owner Author

mratsim commented Jan 4, 2020

Another idea, let's colocate the Promise and dependent task on the same worker.

Case 1: delayed task scheduled before the promise is fulfilled

A promise is an ephemeral MPSC channel. On creation it has no owner.

When a worker encounters a delayed task dependent on the promise it sends it to the promise channel. When a worker fulfills a promise (a producer), it becomes the owner of the promise channel it takes the tasks that are in the channel and schedule them.

Case 2: promise fulfilled before the delayed task is scheduled

When a producer fulfills a promise, it needs to set a flag "fulfilled" or "closed" on the channel, new consumers can check it and they can schedule the tasks themselves.
The producer can then proceed to drain the channel.

Case 3: Memory reclamation

Assuming we have interleaving of Consumer 1 (C1), producer (P) and Consumer 2 (C2).
-> C1 encounters a delayed task T1, sends it to Channel
-> P fulfills promise, drain Channel, schedule T1
-> C2 encounters a delayed task T2, see that Channel is closed, schedule T2
=> When to delete and reclaim Channel memory?

Similar to the PubSub protocol, the Promise is refcounted so when only one reference remains whether it's a producer or consumer it can delete the Channel.

Case 4: A task depends on 2 promises

When a task depends on 2 promises we need a tiebreaker on which will receive the task, for example it will be the channel with the lowest byte representation. The task is sent to that first promise (P1)
The delayed task will also keep a pointer to the second promise (P2) (so that it's not collected).

2 cases:

  • The worker sets P1, drain tasks, then checks the state of P2: if it's also done, it schedules the task, decrement its refcount on P2, recycle it if necessary.
  • The worker sets P1, drain tasks, then checks the state of P2: if it's not done, it sends the task to P2, removes references to P1.

Analysis

Latency

Compared to the other 2 schemes, the delayed tasks always enter the normal scheduling cycle ASAP. i.e. as soon as the producer finishes or as soon as the task is created.

Overhead

  • No unbounded polling/scan "are you ready" (pubsub)
  • No exponential number of messages sent (gossipsub), only 1 per dependency
  • No need for a graph library to map (promise1, promise2) => Task
    We only need to map promise1 => Task and on fulfill we send the task to promise2

Ergonomics

  • No ordering constraint on the producing and consuming tasks

Slight bonus

  • A worker that fulfills a promise and then schedule dependent tasks will have the data still hot in cache.

Unsure

Loops, does that mean 1 channel per loop iteration? "memory is cheap" they said.
Even if we have dependencies on 1 million loop iteration, 1 million channels will be less overhead that the trillions of fibonacci tasks that are spawned in the benchmark, though memory usage will spike to 256MB (1 channel = 1 WV_MemBlockSize = 2x WV_CacheLinePadding = 2x 128 bytes)

@mratsim mratsim added enhancement :shipit: New feature or request help wanted 👥 Extra attention is needed question ❓ Further information is requested labels Jan 4, 2020
@mratsim
Copy link
Owner Author

mratsim commented Jan 7, 2020

closed by #94

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement :shipit: New feature or request help wanted 👥 Extra attention is needed question ❓ Further information is requested
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant