Skip to content

Commit

Permalink
Merge pull request #5914 from amitmurthy/amitm/map!
Browse files Browse the repository at this point in the history
map!, similar and eltype for DArray and SharedArrays
  • Loading branch information
amitmurthy committed Feb 25, 2014
2 parents 7bae6aa + c7c85b2 commit 7975b98
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
17 changes: 17 additions & 0 deletions base/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims)
# new DArray similar to an existing one
DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...])

similar(d::DArray, T, dims::Dims)= DArray(I->Array(T, map(length,I)), dims, procs(d))
similar(d::DArray, T)= similar(d, T, size(d))
similar{T}(d::DArray{T}, dims::Dims)= similar(d, T, dims)
similar{T}(d::DArray{T})= similar(d, T, size(d))

eltype{T}(d::DArray{T}) = T

size(d::DArray) = d.dims
procs(d::DArray) = d.pmap

Expand Down Expand Up @@ -298,3 +305,13 @@ map(f::Callable, d::DArray) = DArray(I->map(f, localpart(d)), d)
reduce(f::Function, d::DArray) =
mapreduce(fetch, f,
{ @spawnat p reduce(f, localpart(d)) for p in procs(d) })


function map!(f::Callable, d::DArray)
@sync begin
for p in procs(d)
@spawnat p map!(f, localpart(d))
end
end
end

27 changes: 26 additions & 1 deletion base/sharedarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,31 @@ function shmem_randn(dims; kwargs...)
end
shmem_randn(I::Int...; kwargs...) = shmem_randn(I; kwargs...)

similar(S::SharedArray, T, dims::Dims) = similar(S.s, T, dims)
similar(S::SharedArray, T, dims::Dims) = SharedArray(T, dims; pids=procs(S))
similar(S::SharedArray, T) = similar(S, T, size(S))
similar(S::SharedArray, dims::Dims) = similar(S, eltype(S), dims)
similar(S::SharedArray) = similar(S, eltype(S), size(S))

eltype(S::SharedArray) = eltype(S.s)

map(f::Callable, S::SharedArray) = (S2 = similar(S); S2[:] = S[:]; map!(f, S2); S2)

reduce(f::Function, S::SharedArray) =
mapreduce(fetch, f,
{ @spawnat p reduce(f, S.loc_subarr_1d) for p in procs(S) })


function map!(f::Callable, S::SharedArray)
@sync begin
for p in procs(S)
@spawnat p begin
for idx in localindexes(S)
S.s[idx] = f(S.s[idx])
end
end
end
end
end


function print_shmem_limits(slen)
Expand Down Expand Up @@ -284,3 +308,4 @@ function assert_same_host(procs)
return (first_privip != getipaddr()) ? false : true
end


17 changes: 17 additions & 0 deletions test/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ a = convert(Matrix{Float64}, d)
@test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1]
@test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101]

d=DArray(I->fill(myid(), map(length,I)), (10,10), [id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100

@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d)
@test reduce(+, d) == 100


@unix_only begin

Expand Down Expand Up @@ -84,6 +92,15 @@ A = convert(SharedArray, AA)
B = convert(SharedArray, AA')
@test B*A == AA'*AA

d=SharedArray(Int64, (10,10); init = D->fill!(D.loc_subarr_1d, myid()), pids=[id_me, id_other])
d2 = map(x->1, d)
@test reduce(+, d2) == 100

@test reduce(+, d) == ((50*id_me) + (50*id_other))
map!(x->1, d)
@test reduce(+, d) == 100


end # @unix_only(SharedArray tests)


Expand Down

0 comments on commit 7975b98

Please sign in to comment.