From 526ce23c3518a2989fd1f4dbcecc19054003c94a Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 20 Jan 2014 19:59:08 +0530 Subject: [PATCH 1/2] ArrayDist storing distribution info for darrays. Deprecated d* functions. --- base/arraydist.jl | 118 ++++++++++++++++++++++ base/darray.jl | 159 ++++++++---------------------- base/deprecated.jl | 16 +++ base/exports.jl | 9 +- base/sysimg.jl | 1 + doc/manual/parallel-computing.rst | 18 ++-- doc/stdlib/base.rst | 55 +++++------ test/parallel.jl | 2 +- 8 files changed, 216 insertions(+), 162 deletions(-) create mode 100644 base/arraydist.jl diff --git a/base/arraydist.jl b/base/arraydist.jl new file mode 100644 index 0000000000000..5ebe23e223cf6 --- /dev/null +++ b/base/arraydist.jl @@ -0,0 +1,118 @@ +abstract ArrayDist + +type SharedDist{N} <: ArrayDist + dims::NTuple{N,Int} + pids::Vector{Int} +end + +type ChunkedDist {N} <: ArrayDist + dims::NTuple{N,Int} + pids::Vector{Int} + + # indexes held by piece i + indexes::Array{NTuple{N,Range1{Int}},N} + + # cuts[d][i] = first index of chunk i in dimension d + cuts::Vector{Vector{Int}} + + dist::NTuple{N,Int} +end + +function SharedDist(dims; pids=workers()) + SharedDist{length(dims)}(dims, pids) +end +SharedDist(dims::Integer...; kwargs...) = SharedDist(dims; kwargs...) + +function ChunkedDist(dims; pids=workers(), dist=distbylargestdim(dims, length(pids))) + if isa(dist, Array) + dist = ntuple(x->dist[x], length(dist)) + end + + npids = length(pids) + nchunks = prod(dist) + + if nchunks > npids + error("Total requested number of chunks is greater than the number of workers") + elseif nchunks < npids + pids = pids[1:nchunks] + end + + idxs, cuts = chunk_idxs([dims...], dist) + assert(dims == map(last,last(idxs))) + + ChunkedDist{length(dims)}(dims, pids, idxs, cuts, dist) +end + +ChunkedDist(dims::Integer...; kwargs...) = ChunkedDist(dims; kwargs...) + +## chunk index utilities ## + +# decide how to divide each dimension +# returns size of chunks array +# allocates largest factor to largest dim +function distbylargestdim(dims, ngroups) + dims = [dims...] + chunks = ones(Int, length(dims)) + f = sort!(collect(keys(factor(ngroups))), rev=true) + k = 1 + while ngroups > 1 + # repeatedly allocate largest factor to largest dim + if ngroups%f[k] != 0 + k += 1 + if k > length(f) + break + end + end + fac = f[k] + (d, dno) = findmax(dims) + # resolve ties to highest dim + dno = last(find(dims .== d)) + if dims[dno] >= fac + dims[dno] = div(dims[dno], fac) + chunks[dno] *= fac + end + ngroups = div(ngroups,fac) + end + chunks +end + +# get array of start indexes for dividing sz into nc chunks +function distbylargestdim(sz::Int, nc::Int) + if sz >= nc + iround(linspace(1, sz+1, nc+1)) + else + [[1:(sz+1)], zeros(Int, nc-sz)] + end +end + + +# compute indexes array for dividing dims into chunks +function chunk_idxs(dims, chunks) + cuts = map(distbylargestdim, dims, chunks) + n = length(dims) + idxs = Array(NTuple{n,Range1{Int}},chunks...) + cartesianmap(tuple(chunks...)) do cidx... + idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) + end + idxs, cuts +end + + +size(ad::ArrayDist) = ad.dims +procs(ad::ArrayDist) = ad.pids +nprocs(ad::ArrayDist) = length(ad.pids) + +getindex(cdist::ChunkedDist, i::Int) = cdist.indexes[i] +getindex(cdist::ChunkedDist, i::Int...) = cdist.indexes[i...] + +dist(cdist::ChunkedDist) = cdist.dist + +# find which piece holds index (I...) +function locate{N}(cdist::ChunkedDist{N}, I::Int...) + ntuple(N, i->searchsortedlast(cdist.cuts[i], I[i])) +end + +localpartindex(cdist::ChunkedDist) = findfirst(cdist.pids, myid()) + + + diff --git a/base/darray.jl b/base/darray.jl index 87874ea6ab72c..7aa31cc93dc38 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -1,22 +1,13 @@ type DArray{T,N,A} <: AbstractArray{T,N} - dims::NTuple{N,Int} + cdist::ChunkedDist{N} chunks::Array{RemoteRef,N} - # pmap[i]==p ⇒ processor p has piece i - pmap::Vector{Int} - - # indexes held by piece i - indexes::Array{NTuple{N,Range1{Int}},N} - # cuts[d][i] = first index of chunk i in dimension d - cuts::Vector{Vector{Int}} - - function DArray(dims, chunks, pmap, indexes, cuts) + function DArray(cdist, chunks) # check invariants - assert(size(chunks) == size(indexes)) - assert(length(chunks) == length(pmap)) - assert(dims == map(last,last(indexes))) - new(dims, chunks, pmap, indexes, cuts) + assert(size(chunks) == dist(cdist)) + assert(length(chunks) == length(procs(cdist))) + new(cdist, chunks) end end @@ -26,134 +17,62 @@ typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N}) ## core constructors ## # dist == size(chunks) -function DArray(init, dims, procs, dist) - np = prod(dist) - procs = procs[1:np] - idxs, cuts = chunk_idxs([dims...], dist) - chunks = Array(RemoteRef, dist...) - for i = 1:np - chunks[i] = remotecall(procs[i], init, idxs[i]) +function DArray(init, cdist::ChunkedDist) + chunks = Array(RemoteRef, dist(cdist)...) + pids = procs(cdist) + for i = 1:nprocs(cdist) + chunks[i] = remotecall(pids[i], init, cdist[i]) end - p = max(1, localpartindex(procs)) - A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) - DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) + p = max(1, localpartindex(cdist)) + A = remotecall_fetch(pids[p], r->typeof(fetch(r)), chunks[p]) + DArray{eltype(A),length(size(cdist)),A}(cdist, chunks) end -function DArray(init, dims, procs) - if isempty(procs) - error("no processors") - end - DArray(init, dims, procs, defaultdist(dims,procs)) -end +DArray(init, dims, pids) = DArray(init, ChunkedDist(dims; pids=pids)) DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims))]) # new DArray similar to an existing one -DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...]) +DArray(init, d::DArray) = DArray(init, distribution(d)) -size(d::DArray) = d.dims -procs(d::DArray) = d.pmap +size(d::DArray) = size(d.cdist) +procs(d::DArray) = procs(d.cdist) +distribution(d::DArray) = d.cdist chunktype{T,N,A}(d::DArray{T,N,A}) = A -## chunk index utilities ## - -# decide how to divide each dimension -# returns size of chunks array -function defaultdist(dims, procs) - dims = [dims...] - chunks = ones(Int, length(dims)) - np = length(procs) - f = sort!(collect(keys(factor(np))), rev=true) - k = 1 - while np > 1 - # repeatedly allocate largest factor to largest dim - if np%f[k] != 0 - k += 1 - if k > length(f) - break - end - end - fac = f[k] - (d, dno) = findmax(dims) - # resolve ties to highest dim - dno = last(find(dims .== d)) - if dims[dno] >= fac - dims[dno] = div(dims[dno], fac) - chunks[dno] *= fac - end - np = div(np,fac) - end - chunks -end - -# get array of start indexes for dividing sz into nc chunks -function defaultdist(sz::Int, nc::Int) - if sz >= nc - iround(linspace(1, sz+1, nc+1)) - else - [[1:(sz+1)], zeros(Int, nc-sz)] - end -end - -# compute indexes array for dividing dims into chunks -function chunk_idxs(dims, chunks) - cuts = map(defaultdist, dims, chunks) - n = length(dims) - idxs = Array(NTuple{n,Range1{Int}},chunks...) - cartesianmap(tuple(chunks...)) do cidx... - idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) - end - idxs, cuts -end - -function localpartindex(pmap::Vector{Int}) - mi = myid() - for i = 1:length(pmap) - if pmap[i] == mi - return i - end - end - return 0 -end - -localpartindex(d::DArray) = localpartindex(d.pmap) - function localpart{T,N,A}(d::DArray{T,N,A}) - lpidx = localpartindex(d) + lpidx = localpartindex(d.cdist) if lpidx == 0 convert(A, Array(T, ntuple(N,i->0)))::A else fetch(d.chunks[lpidx])::A end end -function localindexes(d::DArray) + +function localindexes{T,N}(d::DArray{T,N}) lpidx = localpartindex(d) if lpidx == 0 - ntuple(ndims(d), i->1:0) + ntuple(N, i->1:0) else - d.indexes[lpidx] + d.cdist[lpidx] end end # find which piece holds index (I...) -function locate(d::DArray, I::Int...) - ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i])) -end +locate(d::DArray, I::Int...) = locate(d.cdist, I...) chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A ## convenience constructors ## -dzeros(args...) = DArray(I->zeros(map(length,I)), args...) -dzeros(d::Int...) = dzeros(d) -dones(args...) = DArray(I->ones(map(length,I)), args...) -dones(d::Int...) = dones(d) -dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) -dfill(v, d::Int...) = dfill(v, d) -drand(args...) = DArray(I->rand(map(length,I)), args...) -drand(d::Int...) = drand(d) -drandn(args...) = DArray(I->randn(map(length,I)), args...) -drandn(d::Int...) = drandn(d) +zeros(cdist::ChunkedDist) = DArray(I->zeros(map(length,I)), cdist) +zeros{T}(::Type{T}, cdist::ChunkedDist) = DArray(I->zeros(T, map(length,I)), cdist) +ones(cdist::ChunkedDist) = DArray(I->ones(map(length,I)), cdist) +ones{T}(::Type{T}, cdist::ChunkedDist) = DArray(I->ones(T, map(length,I)), cdist) +fill(v, cdist::ChunkedDist) = DArray(I->fill(v, map(length,I)), cdist) +rand(cdist::ChunkedDist) = DArray(I->rand(map(length,I)), cdist) +randn(cdist::ChunkedDist) = DArray(I->randn(map(length,I)), cdist) + ## conversions ## @@ -170,7 +89,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) a = Array(S, size(d)) @sync begin for i = 1:length(d.chunks) - @async a[d.indexes[i]...] = chunk(d, i) + @async a[d.cdist[i]...] = chunk(d, i) end end a @@ -181,7 +100,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) d = s.parent if isa(I,(Range1{Int}...)) && S<:T && T<:S l = locate(d, map(first, I)...) - if isequal(d.indexes[l...], I) + if isequal(d.cdist[l...], I) # SubDArray corresponds to a chunk return chunk(d, l...) end @@ -229,11 +148,11 @@ end getindex(d::DArray, i::Int) = getindex(d, ind2sub(size(d), i)) getindex(d::DArray, i::Int...) = getindex(d, sub2ind(size(d), i...)) -function getindex{T}(d::DArray{T}, I::(Int...)) +function getindex{T,N}(d::DArray{T,N}, I::(Int...)) chidx = locate(d, I...) chunk = d.chunks[chidx...] - idxs = d.indexes[chidx...] - localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1)) + idxs = d.cdist[chidx...] + localidx = ntuple(N, i->(I[i]-first(idxs[i])+1)) chunk[localidx...]::T end @@ -249,7 +168,7 @@ function setindex!(a::Array, d::DArray, I::Range1{Int}...) n = length(I) @sync begin for i = 1:length(d.chunks) - K = d.indexes[i] + K = d.cdist[i] @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) end end @@ -267,7 +186,7 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] @sync begin for i = 1:length(d.chunks) - K_c = {d.indexes[i]...} + K_c = {d.cdist[i]...} K = [ intersect(J[j],K_c[j]) for j=1:n ] if !any(isempty, K) idxs = [ I[j][K[j]-offs[j]] for j=1:n ] diff --git a/base/deprecated.jl b/base/deprecated.jl index 3f61a90f2424b..707536cb64606 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -395,6 +395,22 @@ eval(Sys, :(@deprecate shlib_list dllist)) @deprecate put put! @deprecate take take! +for dp in ["dzeros", "dones", "drand", "drandn"] + depf = symbol(dp) + newf = symbol(dp[2:end]) + @eval begin + @deprecate ($depf)(d::Int...) ($newf)(ChunkedDist(d)) + @deprecate ($depf)(dims) ($newf)(ChunkedDist(dims)) + @deprecate ($depf)(dims, pids) ($newf)(ChunkedDist(dims; pids=pids)) + @deprecate ($depf)(dims, pids, dist) ($newf)(ChunkedDist(dims; pids=pids, dist=dist)) + end +end + +@deprecate dfill(v, d::Int...) fill(v, ChunkedDist(d)) +@deprecate dfill(v, dims) fill(v, ChunkedDist(dims)) +@deprecate dfill(v, dims, pids) fill(v, ChunkedDist(dims; pids=pids)) +@deprecate dfill(v, dims, pids, dist) fill(v, ChunkedDist(dims; pids=pids, dist=dist)) + # 0.3 discontinued functions function nnz(X) diff --git a/base/exports.jl b/base/exports.jl index 49af1cd19abfd..91d06ceabca7e 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -18,6 +18,7 @@ export AbstractVector, AbstractVecOrMat, Array, + ArrayDist, Associative, Bidiagonal, BigFloat, @@ -29,6 +30,7 @@ export CFILE, Cholesky, CholeskyPivoted, + ChunkedDist, Cmd, Colon, Complex, @@ -89,6 +91,7 @@ export Schur, Set, SharedArray, + SharedDist, SparseMatrixCSC, StatStruct, StridedArray, @@ -1148,12 +1151,8 @@ export redisplay, # distributed arrays - dfill, distribute, - dones, - drand, - drandn, - dzeros, + distribution, localpart, localindexes, procs, diff --git a/base/sysimg.jl b/base/sysimg.jl index 0f50460ade6ed..8d763911b5ced 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -169,6 +169,7 @@ importall .Sort include("combinatorics.jl") # distributed arrays and memory-mapped arrays +include("arraydist.jl") include("darray.jl") include("mmap.jl") include("sharedarray.jl") diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 12bfcffed403f..b43edd485c3ef 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -416,21 +416,23 @@ A ``DArray`` can also use arbitrary array-like types to represent the local chunks that store actual data. The data in a ``DArray`` is distributed by dividing the index space into some number of blocks in each dimension. -Common kinds of arrays can be constructed with functions beginning with -``d``:: +An object of type ``ChunkedDist`` represents the partionining of the +index space. - dzeros(100,100,10) - dones(100,100,10) - drand(100,100,10) - drandn(100,100,10) - dfill(x, 100,100,10) +Common kinds of distributed arrays can be constructed with the ``ChunkedDist`` variants +of the regular array construction functions:: + zeros(ChunkedDist(100,100,10)) + ones(ChunkedDist(100,100,10)) + rand(ChunkedDist(100,100,10)) + randn(ChunkedDist(100,100,10)) + fill(x, ChunkedDist(100,100,10)) In the last case, each element will be initialized to the specified value ``x``. These functions automatically pick a distribution for you. For more control, you can specify which processors to use, and how the data should be distributed:: - dzeros((100,100), workers()[1:4], [1,4]) + zeros(ChunkedDist((100,100); pids=workers()[1:4], dist=[1,4]) The second argument specifies that the array should be created on the first four workers. When dividing data among a large number of processes, diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 2a679213b82b8..91cc9e5aa114e 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4595,44 +4595,43 @@ Parallel Computing Distributed Arrays ------------------ -.. function:: DArray(init, dims, [procs, dist]) +.. function:: DArray(init, cdist::ChunkedDist) Construct a distributed array. ``init`` is a function that accepts a tuple of index ranges. This function should allocate a local chunk of the distributed array and initialize it for the specified indices. - ``dims`` is the overall size of the distributed array. ``procs`` optionally specifies a vector of processor IDs to use. - If unspecified, the array is distributed over all worker processes only. Typically, when runnning in distributed mode, + A ``ChunkedDist(dims; pids::Vector, dist::Vector)`` represents the partitioning of overall array size ``dims`` + across workers identified by ``pids``. + + Keyword argument ``pids`` defaults to all worker processes only. Typically, when runnning in distributed mode, i.e., ``nprocs() > 1``, this would mean that no chunk of the distributed array exists on the process hosting the interactive julia prompt. - ``dist`` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. - - For example, the ``dfill`` function that creates a distributed array and fills it with a value ``v`` is implemented as: - - ``dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)`` - -.. function:: dzeros(dims, ...) - - Construct a distributed array of zeros. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dones(dims, ...) - - Construct a distributed array of ones. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dfill(x, dims, ...) - - Construct a distributed array filled with value ``x``. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: drand(dims, ...) - - Construct a distributed uniform random array. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: drandn(dims, ...) - - Construct a distributed normal random array. Trailing arguments are the same as those accepted by ``darray``. + + Keyword argument ``dist`` is an integer vector specifying how many chunks the distributed array should be divided + into in each dimension, and defaults to a partion along the larger dimensions first. + + For example, the DArray variant of the ``fill`` function that creates a distributed array and fills it with a value ``v`` is implemented as: + ``fill(v, cdist::ChunkedDist) = DArray(I->fill(v, map(length,I)), cdist)`` + + + The following convenience constructors, which create a distributed version of the corresponsing array are available :: + + zeros(cdist::ChunkedDist) + zeros(::Type{T}, cdist::ChunkedDist) + ones(cdist::ChunkedDist) + ones(::Type{T}, cdist::ChunkedDist) + fill(v, cdist::ChunkedDist) + rand(cdist::ChunkedDist) + randn(cdist::ChunkedDist) + .. function:: distribute(a) Convert a local array to distributed +.. function:: distribution(d::DArray) + + Get the ChunkedDist representing the distribution of ``d`` + .. function:: localpart(d) Get the local piece of a distributed array. Returns an empty array if no local part exists on the calling process. diff --git a/test/parallel.jl b/test/parallel.jl index 73d07df09f043..831f2e1469c39 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -12,7 +12,7 @@ id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -d = drand((200,200), [id_me, id_other]) +d = rand(ChunkedDist((200,200); pids=[id_me, id_other])) s = convert(Matrix{Float64}, d[1:150, 1:150]) a = convert(Matrix{Float64}, d) @test a[1:150,1:150] == s From d7be99a30c2f8000c68a2405d49fc4e62d84c4e1 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Thu, 20 Feb 2014 14:54:58 +0530 Subject: [PATCH 2/2] fixed mismatch between size/getindex of ChunkedDist --- base/arraydist.jl | 5 ++++- base/darray.jl | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/base/arraydist.jl b/base/arraydist.jl index 5ebe23e223cf6..7d7d620d9463b 100644 --- a/base/arraydist.jl +++ b/base/arraydist.jl @@ -98,10 +98,13 @@ function chunk_idxs(dims, chunks) end -size(ad::ArrayDist) = ad.dims procs(ad::ArrayDist) = ad.pids nprocs(ad::ArrayDist) = length(ad.pids) +size(cdist::ChunkedDist) = size(cdist.indexes) +size(sdist::SharedDist) = size(sdist.pids) + + getindex(cdist::ChunkedDist, i::Int) = cdist.indexes[i] getindex(cdist::ChunkedDist, i::Int...) = cdist.indexes[i...] diff --git a/base/darray.jl b/base/darray.jl index 7aa31cc93dc38..11fabea9f6753 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -34,7 +34,7 @@ DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims) # new DArray similar to an existing one DArray(init, d::DArray) = DArray(init, distribution(d)) -size(d::DArray) = size(d.cdist) +size(d::DArray) = d.cdist.dims procs(d::DArray) = procs(d.cdist) distribution(d::DArray) = d.cdist