Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayDist storing distribution info for darrays. Deprecated d* functions... #5452

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions base/arraydist.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
abstract ArrayDist

type SharedDist{N} <: ArrayDist
dims::NTuple{N,Int}
pids::Vector{Int}
end

type ChunkedDist {N} <: ArrayDist
dims::NTuple{N,Int}
pids::Vector{Int}

# indexes held by piece i
indexes::Array{NTuple{N,Range1{Int}},N}

# cuts[d][i] = first index of chunk i in dimension d
cuts::Vector{Vector{Int}}

dist::NTuple{N,Int}
end

function SharedDist(dims; pids=workers())
SharedDist{length(dims)}(dims, pids)
end
SharedDist(dims::Integer...; kwargs...) = SharedDist(dims; kwargs...)

function ChunkedDist(dims; pids=workers(), dist=distbylargestdim(dims, length(pids)))
if isa(dist, Array)
dist = ntuple(x->dist[x], length(dist))
end

npids = length(pids)
nchunks = prod(dist)

if nchunks > npids
error("Total requested number of chunks is greater than the number of workers")
elseif nchunks < npids
pids = pids[1:nchunks]
end

idxs, cuts = chunk_idxs([dims...], dist)
assert(dims == map(last,last(idxs)))

ChunkedDist{length(dims)}(dims, pids, idxs, cuts, dist)
end

ChunkedDist(dims::Integer...; kwargs...) = ChunkedDist(dims; kwargs...)

## chunk index utilities ##

# decide how to divide each dimension
# returns size of chunks array
# allocates largest factor to largest dim
function distbylargestdim(dims, ngroups)
dims = [dims...]
chunks = ones(Int, length(dims))
f = sort!(collect(keys(factor(ngroups))), rev=true)
k = 1
while ngroups > 1
# repeatedly allocate largest factor to largest dim
if ngroups%f[k] != 0
k += 1
if k > length(f)
break
end
end
fac = f[k]
(d, dno) = findmax(dims)
# resolve ties to highest dim
dno = last(find(dims .== d))
if dims[dno] >= fac
dims[dno] = div(dims[dno], fac)
chunks[dno] *= fac
end
ngroups = div(ngroups,fac)
end
chunks
end

# get array of start indexes for dividing sz into nc chunks
function distbylargestdim(sz::Int, nc::Int)
if sz >= nc
iround(linspace(1, sz+1, nc+1))
else
[[1:(sz+1)], zeros(Int, nc-sz)]
end
end


# compute indexes array for dividing dims into chunks
function chunk_idxs(dims, chunks)
cuts = map(distbylargestdim, dims, chunks)
n = length(dims)
idxs = Array(NTuple{n,Range1{Int}},chunks...)
cartesianmap(tuple(chunks...)) do cidx...
idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1))
end
idxs, cuts
end


size(ad::ArrayDist) = ad.dims
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be inconsistent with getindex below; i.e. the size does not give the valid range of indexes for getindex. That could be confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. How about size(ChunkedDist) returning the size of the indexes, while size(SharedDist) returns the length of the pids vector. Both describe the size of the distribution.

size(DArray) and size(SharedArray) will return the size of the complete array. There will not be a function to retrieve dims from an ArrayDist - folks have to use ad.dims

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed an update.

procs(ad::ArrayDist) = ad.pids
nprocs(ad::ArrayDist) = length(ad.pids)

getindex(cdist::ChunkedDist, i::Int) = cdist.indexes[i]
getindex(cdist::ChunkedDist, i::Int...) = cdist.indexes[i...]

dist(cdist::ChunkedDist) = cdist.dist

# find which piece holds index (I...)
function locate{N}(cdist::ChunkedDist{N}, I::Int...)
ntuple(N, i->searchsortedlast(cdist.cuts[i], I[i]))
end

localpartindex(cdist::ChunkedDist) = findfirst(cdist.pids, myid())



159 changes: 39 additions & 120 deletions base/darray.jl
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
type DArray{T,N,A} <: AbstractArray{T,N}
dims::NTuple{N,Int}
cdist::ChunkedDist{N}

chunks::Array{RemoteRef,N}

# pmap[i]==p ⇒ processor p has piece i
pmap::Vector{Int}

# indexes held by piece i
indexes::Array{NTuple{N,Range1{Int}},N}
# cuts[d][i] = first index of chunk i in dimension d
cuts::Vector{Vector{Int}}

function DArray(dims, chunks, pmap, indexes, cuts)
function DArray(cdist, chunks)
# check invariants
assert(size(chunks) == size(indexes))
assert(length(chunks) == length(pmap))
assert(dims == map(last,last(indexes)))
new(dims, chunks, pmap, indexes, cuts)
assert(size(chunks) == dist(cdist))
assert(length(chunks) == length(procs(cdist)))
new(cdist, chunks)
end
end

Expand All @@ -26,134 +17,62 @@ typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N})
## core constructors ##

# dist == size(chunks)
function DArray(init, dims, procs, dist)
np = prod(dist)
procs = procs[1:np]
idxs, cuts = chunk_idxs([dims...], dist)
chunks = Array(RemoteRef, dist...)
for i = 1:np
chunks[i] = remotecall(procs[i], init, idxs[i])
function DArray(init, cdist::ChunkedDist)
chunks = Array(RemoteRef, dist(cdist)...)
pids = procs(cdist)
for i = 1:nprocs(cdist)
chunks[i] = remotecall(pids[i], init, cdist[i])
end
p = max(1, localpartindex(procs))
A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p])
DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts)
p = max(1, localpartindex(cdist))
A = remotecall_fetch(pids[p], r->typeof(fetch(r)), chunks[p])
DArray{eltype(A),length(size(cdist)),A}(cdist, chunks)
end

function DArray(init, dims, procs)
if isempty(procs)
error("no processors")
end
DArray(init, dims, procs, defaultdist(dims,procs))
end
DArray(init, dims, pids) = DArray(init, ChunkedDist(dims; pids=pids))
DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims))])

# new DArray similar to an existing one
DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...])
DArray(init, d::DArray) = DArray(init, distribution(d))

size(d::DArray) = d.dims
procs(d::DArray) = d.pmap
size(d::DArray) = size(d.cdist)
procs(d::DArray) = procs(d.cdist)
distribution(d::DArray) = d.cdist

chunktype{T,N,A}(d::DArray{T,N,A}) = A

## chunk index utilities ##

# decide how to divide each dimension
# returns size of chunks array
function defaultdist(dims, procs)
dims = [dims...]
chunks = ones(Int, length(dims))
np = length(procs)
f = sort!(collect(keys(factor(np))), rev=true)
k = 1
while np > 1
# repeatedly allocate largest factor to largest dim
if np%f[k] != 0
k += 1
if k > length(f)
break
end
end
fac = f[k]
(d, dno) = findmax(dims)
# resolve ties to highest dim
dno = last(find(dims .== d))
if dims[dno] >= fac
dims[dno] = div(dims[dno], fac)
chunks[dno] *= fac
end
np = div(np,fac)
end
chunks
end

# get array of start indexes for dividing sz into nc chunks
function defaultdist(sz::Int, nc::Int)
if sz >= nc
iround(linspace(1, sz+1, nc+1))
else
[[1:(sz+1)], zeros(Int, nc-sz)]
end
end

# compute indexes array for dividing dims into chunks
function chunk_idxs(dims, chunks)
cuts = map(defaultdist, dims, chunks)
n = length(dims)
idxs = Array(NTuple{n,Range1{Int}},chunks...)
cartesianmap(tuple(chunks...)) do cidx...
idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1))
end
idxs, cuts
end

function localpartindex(pmap::Vector{Int})
mi = myid()
for i = 1:length(pmap)
if pmap[i] == mi
return i
end
end
return 0
end

localpartindex(d::DArray) = localpartindex(d.pmap)

function localpart{T,N,A}(d::DArray{T,N,A})
lpidx = localpartindex(d)
lpidx = localpartindex(d.cdist)
if lpidx == 0
convert(A, Array(T, ntuple(N,i->0)))::A
else
fetch(d.chunks[lpidx])::A
end
end
function localindexes(d::DArray)

function localindexes{T,N}(d::DArray{T,N})
lpidx = localpartindex(d)
if lpidx == 0
ntuple(ndims(d), i->1:0)
ntuple(N, i->1:0)
else
d.indexes[lpidx]
d.cdist[lpidx]
end
end

# find which piece holds index (I...)
function locate(d::DArray, I::Int...)
ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i]))
end
locate(d::DArray, I::Int...) = locate(d.cdist, I...)

chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A

## convenience constructors ##

dzeros(args...) = DArray(I->zeros(map(length,I)), args...)
dzeros(d::Int...) = dzeros(d)
dones(args...) = DArray(I->ones(map(length,I)), args...)
dones(d::Int...) = dones(d)
dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
dfill(v, d::Int...) = dfill(v, d)
drand(args...) = DArray(I->rand(map(length,I)), args...)
drand(d::Int...) = drand(d)
drandn(args...) = DArray(I->randn(map(length,I)), args...)
drandn(d::Int...) = drandn(d)
zeros(cdist::ChunkedDist) = DArray(I->zeros(map(length,I)), cdist)
zeros{T}(::Type{T}, cdist::ChunkedDist) = DArray(I->zeros(T, map(length,I)), cdist)
ones(cdist::ChunkedDist) = DArray(I->ones(map(length,I)), cdist)
ones{T}(::Type{T}, cdist::ChunkedDist) = DArray(I->ones(T, map(length,I)), cdist)
fill(v, cdist::ChunkedDist) = DArray(I->fill(v, map(length,I)), cdist)
rand(cdist::ChunkedDist) = DArray(I->rand(map(length,I)), cdist)
randn(cdist::ChunkedDist) = DArray(I->randn(map(length,I)), cdist)


## conversions ##

Expand All @@ -170,7 +89,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N})
a = Array(S, size(d))
@sync begin
for i = 1:length(d.chunks)
@async a[d.indexes[i]...] = chunk(d, i)
@async a[d.cdist[i]...] = chunk(d, i)
end
end
a
Expand All @@ -181,7 +100,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N})
d = s.parent
if isa(I,(Range1{Int}...)) && S<:T && T<:S
l = locate(d, map(first, I)...)
if isequal(d.indexes[l...], I)
if isequal(d.cdist[l...], I)
# SubDArray corresponds to a chunk
return chunk(d, l...)
end
Expand Down Expand Up @@ -229,11 +148,11 @@ end
getindex(d::DArray, i::Int) = getindex(d, ind2sub(size(d), i))
getindex(d::DArray, i::Int...) = getindex(d, sub2ind(size(d), i...))

function getindex{T}(d::DArray{T}, I::(Int...))
function getindex{T,N}(d::DArray{T,N}, I::(Int...))
chidx = locate(d, I...)
chunk = d.chunks[chidx...]
idxs = d.indexes[chidx...]
localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1))
idxs = d.cdist[chidx...]
localidx = ntuple(N, i->(I[i]-first(idxs[i])+1))
chunk[localidx...]::T
end

Expand All @@ -249,7 +168,7 @@ function setindex!(a::Array, d::DArray, I::Range1{Int}...)
n = length(I)
@sync begin
for i = 1:length(d.chunks)
K = d.indexes[i]
K = d.cdist[i]
@async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i)
end
end
Expand All @@ -267,7 +186,7 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...)
offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n]
@sync begin
for i = 1:length(d.chunks)
K_c = {d.indexes[i]...}
K_c = {d.cdist[i]...}
K = [ intersect(J[j],K_c[j]) for j=1:n ]
if !any(isempty, K)
idxs = [ I[j][K[j]-offs[j]] for j=1:n ]
Expand Down
16 changes: 16 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,22 @@ eval(Sys, :(@deprecate shlib_list dllist))
@deprecate put put!
@deprecate take take!

for dp in ["dzeros", "dones", "drand", "drandn"]
depf = symbol(dp)
newf = symbol(dp[2:end])
@eval begin
@deprecate ($depf)(d::Int...) ($newf)(ChunkedDist(d))
@deprecate ($depf)(dims) ($newf)(ChunkedDist(dims))
@deprecate ($depf)(dims, pids) ($newf)(ChunkedDist(dims; pids=pids))
@deprecate ($depf)(dims, pids, dist) ($newf)(ChunkedDist(dims; pids=pids, dist=dist))
end
end

@deprecate dfill(v, d::Int...) fill(v, ChunkedDist(d))
@deprecate dfill(v, dims) fill(v, ChunkedDist(dims))
@deprecate dfill(v, dims, pids) fill(v, ChunkedDist(dims; pids=pids))
@deprecate dfill(v, dims, pids, dist) fill(v, ChunkedDist(dims; pids=pids, dist=dist))

# 0.3 discontinued functions

function nnz(X)
Expand Down
Loading