Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create SharedArray from disk file #12560

Merged
merged 5 commits into from
Aug 12, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,7 @@ export
# shared arrays
sdata,
indexpids,
localindexes,

# paths and file names
abspath,
Expand Down
131 changes: 90 additions & 41 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
type SharedArray{T,N} <: DenseArray{T,N}
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Array{RemoteRef}
refs::Vector{RemoteRef}

# The segname is currently used only in the test scripts to ensure that
# the shmem segment has been unlinked.
segname::AbstractString
segname::UTF8String

# Fields below are not to be serialized
# Local shmem map.
Expand All @@ -19,7 +19,7 @@ type SharedArray{T,N} <: DenseArray{T,N}
# the local partition into the array when viewed as a single dimensional array.
# this can be removed when @parallel or its equivalent supports looping on
# a subset of workers.
loc_subarr_1d
loc_subarr_1d::SubArray{T,1,Array{T,N},Tuple{UnitRange{Int}},1}

SharedArray(d,p,r,sn) = new(d,p,r,sn)
end
Expand All @@ -29,21 +29,7 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])

isbits(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))

if isempty(pids)
# only use workers on the current host
pids = procs(myid())
if length(pids) > 1
pids = filter(x -> x != 1, pids)
end

onlocalhost = true
else
if !check_same_host(pids)
throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
end

onlocalhost = myid() in procs(pids[1])
end
pids, onlocalhost = shared_pids(pids)

local shm_seg_name = ""
local s
Expand Down Expand Up @@ -114,11 +100,75 @@ end

SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...)

# Create a SharedArray from a disk file
function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,Int}, offset::Integer=0; mode=nothing, init=false, pids::Vector{Int}=Int[])
isabspath(filename) || error("$filename is not an absolute path; try abspath(filename)?")
isbits(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))

pids, onlocalhost = shared_pids(pids)

# If not supplied, determine the appropriate mode
have_file = onlocalhost ? isfile(filename) : remotecall_fetch(pids[1], isfile, filename)
if mode == nothing
mode = have_file ? "r+" : "w+"
end
workermode = mode == "w+" ? "r+" : mode # workers don't truncate!

# Ensure the file will be readable
mode in ("r", "r+", "w+", "a+") || error("mode must be readable, but I got $mode")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be ArgumentError's? Also, using a personal pronoun in an error message is a bit weird.

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points all round, will fix.

init==false || mode in ("r+", "w+", "a+") || error("cannot initialize array unless it is writable")

# Create the file if it doesn't exist, map it if it does
refs = Array(RemoteRef, length(pids))
func_mmap = mode -> open(filename, mode) do io
Mmap.mmap(io, Array{T,N}, dims, offset; shared=true)
end
local s
if onlocalhost
s = func_mmap(mode)
refs[1] = remotecall(pids[1], () -> func_mmap(workermode))
else
refs[1] = remotecall_wait(pids[1], () -> func_mmap(mode))
end

# Populate the rest of the workers
for i = 2:length(pids)
refs[i] = remotecall(pids[i], () -> func_mmap(workermode))
end

# Wait till all the workers have mapped the segment
for i in 1:length(refs)
wait(refs[i])
end

S = SharedArray{T,N}(dims, pids, refs, filename)

if onlocalhost
init_loc_flds(S)
# In the event that myid() is not part of pids, s will not be set
# in the init function above, hence setting it here if available.
S.s = s
else
S.pidx = 0
end

# if present, init function is called on each of the parts
if isa(init, Function)
@sync begin
for p in pids
@async remotecall_wait(p, init, S)
end
end
end
S
end

typealias SharedVector{T} SharedArray{T,1}
typealias SharedMatrix{T} SharedArray{T,2}

length(S::SharedArray) = prod(S.dims)
size(S::SharedArray) = S.dims
linearindexing{S<:SharedArray}(::Type{S}) = LinearFast()

function reshape{T,N}(a::SharedArray{T}, dims::NTuple{N,Int})
(length(a) != prod(dims)) && throw(DimensionMismatch("dimensions must be consistent with array size"))
Expand Down Expand Up @@ -155,6 +205,25 @@ function deepcopy_internal(S::SharedArray, stackdict::ObjectIdDict)
return R
end

function shared_pids(pids)
if isempty(pids)
# only use workers on the current host
pids = procs(myid())
if length(pids) > 1
pids = filter(x -> x != 1, pids)
end

onlocalhost = true
else
if !check_same_host(pids)
throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
end

onlocalhost = myid() in procs(pids[1])
end
pids, onlocalhost
end

function range_1dim(S::SharedArray, pidx)
l = length(S)
nw = length(S.pids)
Expand All @@ -175,14 +244,14 @@ end

sub_1dim(S::SharedArray, pidx) = sub(S.s, range_1dim(S, pidx))

function init_loc_flds{T}(S::SharedArray{T})
function init_loc_flds{T,N}(S::SharedArray{T,N})
if myid() in S.pids
S.pidx = findfirst(S.pids, myid())
S.s = fetch(S.refs[S.pidx])
S.loc_subarr_1d = sub_1dim(S, S.pidx)
else
S.pidx = 0
S.loc_subarr_1d = Array(T, 0)
S.loc_subarr_1d = sub(Array(T, ntuple(d->0,N)), 1:0)
end
end

Expand All @@ -209,30 +278,10 @@ end

convert(::Type{Array}, S::SharedArray) = S.s

# pass through getindex and setindex! - they always work on the complete array unlike DArrays
getindex(S::SharedArray) = getindex(S.s)
# pass through getindex and setindex! - unlike DArrays, these always work on the complete array
getindex(S::SharedArray, I::Real) = getindex(S.s, I)
getindex(S::SharedArray, I::AbstractArray) = getindex(S.s, I)
getindex(S::SharedArray, I::Colon) = getindex(S.s, I)
@generated function getindex(S::SharedArray, I::Union{Real,AbstractVector,Colon}...)
N = length(I)
Isplat = Expr[:(I[$d]) for d = 1:N]
quote
getindex(S.s, $(Isplat...))
end
end

setindex!(S::SharedArray, x) = setindex!(S.s, x)
setindex!(S::SharedArray, x, I::Real) = setindex!(S.s, x, I)
setindex!(S::SharedArray, x, I::AbstractArray) = setindex!(S.s, x, I)
setindex!(S::SharedArray, x, I::Colon) = setindex!(S.s, x, I)
@generated function setindex!(S::SharedArray, x, I::Union{Real,AbstractVector,Colon}...)
N = length(I)
Isplat = Expr[:(I[$d]) for d = 1:N]
quote
setindex!(S.s, x, $(Isplat...))
end
end

function fill!(S::SharedArray, v)
vT = convert(eltype(S), v)
Expand Down
73 changes: 72 additions & 1 deletion test/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ end

# TODO : Need a similar test of shmem cleanup for OSX

# SharedArray tests
##### SharedArray tests

function check_pids_all(S::SharedArray)
pidtested = falses(size(S))
for p in procs(S)
idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], S)
@test all(sdata(S)[idxes_in_p] .== p)
pidtested[idxes_in_p] = true
end
@test all(pidtested)
end

d = Base.shmem_rand(1:100, dims)
a = convert(Array, d)

Expand Down Expand Up @@ -86,6 +97,66 @@ for p in procs(d)
@test d[idxl] == p
end

### SharedArrays from a file

# Mapping an existing file
fn = tempname()
open(fn, "w") do io
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should probably clean up after themselves when they're done

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 13846e5

write(io, 1:30)
end
sz = (6,5)
Atrue = reshape(1:30, sz)

S = SharedArray(fn, Int, sz)
@test S == Atrue
@test length(procs(S)) > 1
@sync begin
for p in procs(S)
@async remotecall_wait(p, D->fill!(D.loc_subarr_1d, myid()), S)
end
end
check_pids_all(S)

filedata = similar(Atrue)
open(fn, "r") do io
read!(io, filedata)
end
@test filedata == sdata(S)

# Error for write-only files
@test_throws ErrorException SharedArray(fn, Int, sz, mode="w")

# Error for file doesn't exist, but not allowed to create
@test_throws SystemError SharedArray(tempname(), Int, sz, mode="r")

# Creating a new file
fn2 = tempname()
S = SharedArray(fn2, Int, sz, init=D->D[localindexes(D)] = myid())
@test S == filedata
filedata2 = similar(Atrue)
open(fn2, "r") do io
read!(io, filedata2)
end
@test filedata == filedata2

# Appending to a file
fn3 = tempname()
open(fn3, "w") do io
write(io, ones(UInt8, 4))
end
S = SharedArray(fn3, UInt8, sz, 4, mode="a+", init=D->D[localindexes(D)]=0x02)
len = prod(sz)+4
@test filesize(fn3) == len
filedata = Array(UInt8, len)
open(fn3, "r") do io
read!(io, filedata)
end
@test all(filedata[1:4] .== 0x01)
@test all(filedata[5:end] .== 0x02)


### Utility functions

# reshape

d = Base.shmem_fill(1.0, (10,10,10))
Expand Down