Skip to content

Commit

Permalink
Merge pull request #12560 from JuliaLang/teh/shared_file
Browse files Browse the repository at this point in the history
Create SharedArray from disk file
  • Loading branch information
timholy committed Aug 12, 2015
2 parents 103f7a3 + 4fb455d commit c6283a1
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 43 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ export
# shared arrays
sdata,
indexpids,
localindexes,

# paths and file names
abspath,
Expand Down
2 changes: 1 addition & 1 deletion base/mmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ function mmap{T,N}(io::IO,

file_desc = gethandle(io)
# platform-specific mmapping
@unix_only begin
@unix_only begin
prot, flags, iswrite = settings(file_desc, shared)
iswrite && grow && grow!(io, offset, len)
# mmap the file
Expand Down
132 changes: 91 additions & 41 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
type SharedArray{T,N} <: DenseArray{T,N}
dims::NTuple{N,Int}
pids::Vector{Int}
refs::Array{RemoteRef}
refs::Vector{RemoteRef}

# The segname is currently used only in the test scripts to ensure that
# the shmem segment has been unlinked.
segname::AbstractString
segname::UTF8String

# Fields below are not to be serialized
# Local shmem map.
Expand All @@ -19,7 +19,7 @@ type SharedArray{T,N} <: DenseArray{T,N}
# the local partition into the array when viewed as a single dimensional array.
# this can be removed when @parallel or its equivalent supports looping on
# a subset of workers.
loc_subarr_1d
loc_subarr_1d::SubArray{T,1,Array{T,N},Tuple{UnitRange{Int}},1}

SharedArray(d,p,r,sn) = new(d,p,r,sn)
end
Expand All @@ -29,21 +29,7 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])

isbits(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))

if isempty(pids)
# only use workers on the current host
pids = procs(myid())
if length(pids) > 1
pids = filter(x -> x != 1, pids)
end

onlocalhost = true
else
if !check_same_host(pids)
throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
end

onlocalhost = myid() in procs(pids[1])
end
pids, onlocalhost = shared_pids(pids)

local shm_seg_name = ""
local s
Expand Down Expand Up @@ -114,11 +100,76 @@ end

SharedArray(T, I::Int...; kwargs...) = SharedArray(T, I; kwargs...)

# Create a SharedArray from a disk file
function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,Int}, offset::Integer=0; mode=nothing, init=false, pids::Vector{Int}=Int[])
isabspath(filename) || throw(ArgumentError("$filename is not an absolute path; try abspath(filename)?"))
isbits(T) || throw(ArgumentError("type of SharedArray elements must be bits types, got $(T)"))

pids, onlocalhost = shared_pids(pids)

# If not supplied, determine the appropriate mode
have_file = onlocalhost ? isfile(filename) : remotecall_fetch(pids[1], isfile, filename)
if mode == nothing
mode = have_file ? "r+" : "w+"
end
workermode = mode == "w+" ? "r+" : mode # workers don't truncate!

# Ensure the file will be readable
mode in ("r", "r+", "w+", "a+") || throw(ArgumentError("mode must be readable, but $mode is not"))
init==false || mode in ("r+", "w+", "a+") || throw(ArgumentError("cannot initialize unwritable array (mode = $mode)"))
mode == "r" && !isfile(filename) && throw(ArgumentError("file $filename does not exist, but mode $mode cannot create it"))

# Create the file if it doesn't exist, map it if it does
refs = Array(RemoteRef, length(pids))
func_mmap = mode -> open(filename, mode) do io
Mmap.mmap(io, Array{T,N}, dims, offset; shared=true)
end
local s
if onlocalhost
s = func_mmap(mode)
refs[1] = remotecall(pids[1], () -> func_mmap(workermode))
else
refs[1] = remotecall_wait(pids[1], () -> func_mmap(mode))
end

# Populate the rest of the workers
for i = 2:length(pids)
refs[i] = remotecall(pids[i], () -> func_mmap(workermode))
end

# Wait till all the workers have mapped the segment
for i in 1:length(refs)
wait(refs[i])
end

S = SharedArray{T,N}(dims, pids, refs, filename)

if onlocalhost
init_loc_flds(S)
# In the event that myid() is not part of pids, s will not be set
# in the init function above, hence setting it here if available.
S.s = s
else
S.pidx = 0
end

# if present, init function is called on each of the parts
if isa(init, Function)
@sync begin
for p in pids
@async remotecall_wait(p, init, S)
end
end
end
S
end

typealias SharedVector{T} SharedArray{T,1}
typealias SharedMatrix{T} SharedArray{T,2}

length(S::SharedArray) = prod(S.dims)
size(S::SharedArray) = S.dims
linearindexing{S<:SharedArray}(::Type{S}) = LinearFast()

function reshape{T,N}(a::SharedArray{T}, dims::NTuple{N,Int})
(length(a) != prod(dims)) && throw(DimensionMismatch("dimensions must be consistent with array size"))
Expand Down Expand Up @@ -155,6 +206,25 @@ function deepcopy_internal(S::SharedArray, stackdict::ObjectIdDict)
return R
end

function shared_pids(pids)
if isempty(pids)
# only use workers on the current host
pids = procs(myid())
if length(pids) > 1
pids = filter(x -> x != 1, pids)
end

onlocalhost = true
else
if !check_same_host(pids)
throw(ArgumentError("SharedArray requires all requested processes to be on the same machine."))
end

onlocalhost = myid() in procs(pids[1])
end
pids, onlocalhost
end

function range_1dim(S::SharedArray, pidx)
l = length(S)
nw = length(S.pids)
Expand All @@ -175,14 +245,14 @@ end

sub_1dim(S::SharedArray, pidx) = sub(S.s, range_1dim(S, pidx))

function init_loc_flds{T}(S::SharedArray{T})
function init_loc_flds{T,N}(S::SharedArray{T,N})
if myid() in S.pids
S.pidx = findfirst(S.pids, myid())
S.s = fetch(S.refs[S.pidx])
S.loc_subarr_1d = sub_1dim(S, S.pidx)
else
S.pidx = 0
S.loc_subarr_1d = Array(T, 0)
S.loc_subarr_1d = sub(Array(T, ntuple(d->0,N)), 1:0)
end
end

Expand All @@ -209,30 +279,10 @@ end

convert(::Type{Array}, S::SharedArray) = S.s

# pass through getindex and setindex! - they always work on the complete array unlike DArrays
getindex(S::SharedArray) = getindex(S.s)
# pass through getindex and setindex! - unlike DArrays, these always work on the complete array
getindex(S::SharedArray, I::Real) = getindex(S.s, I)
getindex(S::SharedArray, I::AbstractArray) = getindex(S.s, I)
getindex(S::SharedArray, I::Colon) = getindex(S.s, I)
@generated function getindex(S::SharedArray, I::Union{Real,AbstractVector,Colon}...)
N = length(I)
Isplat = Expr[:(I[$d]) for d = 1:N]
quote
getindex(S.s, $(Isplat...))
end
end

setindex!(S::SharedArray, x) = setindex!(S.s, x)
setindex!(S::SharedArray, x, I::Real) = setindex!(S.s, x, I)
setindex!(S::SharedArray, x, I::AbstractArray) = setindex!(S.s, x, I)
setindex!(S::SharedArray, x, I::Colon) = setindex!(S.s, x, I)
@generated function setindex!(S::SharedArray, x, I::Union{Real,AbstractVector,Colon}...)
N = length(I)
Isplat = Expr[:(I[$d]) for d = 1:N]
quote
setindex!(S.s, x, $(Isplat...))
end
end

function fill!(S::SharedArray, v)
vT = convert(eltype(S), v)
Expand Down
73 changes: 72 additions & 1 deletion test/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ end

# TODO : Need a similar test of shmem cleanup for OSX

# SharedArray tests
##### SharedArray tests

function check_pids_all(S::SharedArray)
pidtested = falses(size(S))
for p in procs(S)
idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], S)
@test all(sdata(S)[idxes_in_p] .== p)
pidtested[idxes_in_p] = true
end
@test all(pidtested)
end

d = Base.shmem_rand(1:100, dims)
a = convert(Array, d)

Expand Down Expand Up @@ -86,6 +97,66 @@ for p in procs(d)
@test d[idxl] == p
end

### SharedArrays from a file

# Mapping an existing file
fn = tempname()
open(fn, "w") do io
write(io, 1:30)
end
sz = (6,5)
Atrue = reshape(1:30, sz)

S = SharedArray(fn, Int, sz)
@test S == Atrue
@test length(procs(S)) > 1
@sync begin
for p in procs(S)
@async remotecall_wait(p, D->fill!(D.loc_subarr_1d, myid()), S)
end
end
check_pids_all(S)

filedata = similar(Atrue)
open(fn, "r") do io
read!(io, filedata)
end
@test filedata == sdata(S)

# Error for write-only files
@test_throws ArgumentError SharedArray(fn, Int, sz, mode="w")

# Error for file doesn't exist, but not allowed to create
@test_throws ArgumentError SharedArray(tempname(), Int, sz, mode="r")

# Creating a new file
fn2 = tempname()
S = SharedArray(fn2, Int, sz, init=D->D[localindexes(D)] = myid())
@test S == filedata
filedata2 = similar(Atrue)
open(fn2, "r") do io
read!(io, filedata2)
end
@test filedata == filedata2

# Appending to a file
fn3 = tempname()
open(fn3, "w") do io
write(io, ones(UInt8, 4))
end
S = SharedArray(fn3, UInt8, sz, 4, mode="a+", init=D->D[localindexes(D)]=0x02)
len = prod(sz)+4
@test filesize(fn3) == len
filedata = Array(UInt8, len)
open(fn3, "r") do io
read!(io, filedata)
end
@test all(filedata[1:4] .== 0x01)
@test all(filedata[5:end] .== 0x02)


### Utility functions

# reshape

d = Base.shmem_fill(1.0, (10,10,10))
Expand Down

0 comments on commit c6283a1

Please sign in to comment.