diff --git a/base/deprecated.jl b/base/deprecated.jl index 79a55fdefadcf..620bf3abd71e4 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -274,3 +274,13 @@ function amap(f::Function, A::AbstractArray, axis::Integer) return R end + +function addprocs_scyld(np::Integer) + error("Base.addprocs_scyld is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_scyld instead.") +end +export addprocs_scyld + +function addprocs_sge(np::Integer) + error("Base.addprocs_sge is deprecated - add package ClusterManagers and then use ClusterManagers.addprocs_sge instead.") +end +export addprocs_sge \ No newline at end of file diff --git a/base/exports.jl b/base/exports.jl index da14d06898ac8..d7106131484db 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1092,8 +1092,6 @@ export # multiprocessing addprocs, - addprocs_scyld, - addprocs_sge, fetch, isready, yield, @@ -1110,6 +1108,7 @@ export remotecall_wait, take, wait, + ClusterManager, # distributed arrays distribute, diff --git a/base/multi.jl b/base/multi.jl index 25f2e4d86d0c5..6de0280eefb52 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -3,8 +3,6 @@ ## julia starts with one process, and processors can be added using: ## addprocs(n) using exec ## addprocs({"host1","host2",...}) using remote execution -## addprocs_scyld(n) using Scyld ClusterWare -## addprocs_sge(n) using Sun Grid Engine batch queue ## ## remotecall(w, func, args...) - ## tell a worker to call a function on the given arguments. @@ -210,7 +208,7 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1}) for i=1:length(w) send_msg_now(w[i], :join_pgrp, w[i].id, all_locs) end - :ok + [w[i].id for i in 1:length(w)] end myid() = LPROC.id @@ -860,7 +858,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately end) end -function disable_parallel_libs() +function disable_threaded_libs() blas_set_num_threads(1) end @@ -879,7 +877,7 @@ function start_worker(out::IO) # close STDIN; workers will not use it #close(STDIN) - disable_parallel_libs() + disable_threaded_libs() ccall(:jl_install_sigint_handler, Void, ()) @@ -896,42 +894,86 @@ function start_worker(out::IO) exit(0) end - -function start_remote_workers(machines, cmds, tunnel=false, sshflags=``) - n = length(cmds) - outs = cell(n) +function start_cluster_workers(n, config) w = cell(n) - for i=1:n - outs[i],_ = readsfrom(cmds[i]) - outs[i].line_buffered = true + cman = config[:cman] + + # Get the cluster manager to launch the instance + (insttype, instances) = cman.launch_cb(n, config) + + + if insttype == :io_only + read_cb_response(inst) = + begin + (host, port) = read_worker_host_port(inst) + inst, host, port + end + elseif insttype == :io_host + read_cb_response(inst) = + begin + io = inst[1] + (_, port) = read_worker_host_port(io) + io, inst[2], port + end + elseif insttype == :io_host_port + read_cb_response(inst) = (inst[1], inst[2], inst[3]) + elseif insttype == :host_port + read_cb_response(inst) = (nothing, inst[1], inst[2]) + elseif insttype == :cmd + read_cb_response(inst) = + begin + io,_ = readsfrom(detach(inst)) + io.line_buffered = true + (host, port) = read_worker_host_port(io) + io, host, port + end + else + error("Unsupported format from Cluster Manager callback") end + for i=1:n - local hostname::String, port::Int16 - stream = outs[i] - stream.line_buffered = true - while true - conninfo = readline(stream) - private_hostname, port = parse_connection_info(conninfo) - if private_hostname != "" - break - end + (io, host, port) = read_cb_response(instances[i]) + w[i] = create_worker(host, port, io, config) + end + w +end + +function read_worker_host_port (io::IO) + io.line_buffered = true + while true + conninfo = readline(io) + private_hostname, port = parse_connection_info(conninfo) + if private_hostname != "" + return private_hostname, port end - - s = split(machines[i],'@') - if length(s) > 1 - user = s[1] - hostname = s[2] - else + end +end + +function create_worker(hostname, port, stream, config) + tunnel = config[:tunnel] + + s = split(hostname,'@') + if length(s) > 1 + user = s[1] + hostname = s[2] + else + if haskey(ENV, "USER") user = ENV["USER"] - hostname = s[1] - end - - if tunnel - w[i] = Worker(hostname, port, user, sshflags) - else - w[i] = Worker(hostname, port) + elseif tunnel + error("USER must be specified either in the environment or as part of the hostname when tunnel option is used.") end - let wrker = w[i] + hostname = s[1] + end + + if tunnel + sshflags = config[:sshflags] + w = Worker(hostname, port, user, sshflags) + else + w = Worker(hostname, port) + end + + if isa(stream, AsyncStream) + let wrker = w # redirect console output from workers to the client's stdout: start_reading(stream,function(stream::AsyncStream,nread::Int) if nread>0 @@ -950,6 +992,7 @@ function start_remote_workers(machines, cmds, tunnel=false, sshflags=``) w end + function parse_connection_info(str) m = match(r"^julia_worker:(\d+)#(.*)", str) if m != nothing @@ -977,135 +1020,69 @@ function ssh_tunnel(user, host, port, sshflags) localp end -#function worker_ssh_cmd(host, key) -# `ssh -i $key -n $host "sh -l -c \"cd $JULIA_HOME && ./julia-release-basic --worker\""` -#end -# start and connect to processes via SSH. -# optionally through an SSH tunnel. -# the tunnel is only used from the head (process 1); the nodes are assumed -# to be mutually reachable without a tunnel, as is often the case in a cluster. -function addprocs(machines::AbstractVector; - tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``) - add_workers(PGRP, - start_remote_workers(machines, - map(m->detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename --worker\""`), - machines), - tunnel, sshflags)) -end - -#function addprocs_ssh(machines, keys) -# if !(isa(keys, Array)) && isa(machines,Array) -# key = keys -# keys = [ key for x = 1:length(machines)] -# cmdargs = { {machines[x],keys[x]} for x = 1:length(machines)} -# else -# cmdargs = {{machines,keys}} -# end #if/else -# add_workers(PGRP, start_remote_workers(machines, map(x->worker_ssh_cmd(x[1],x[2]), cmdargs))) -#end +abstract ClusterManager -worker_local_cmd() = `$JULIA_HOME/julia-release-basic --bind-to $bind_addr --worker` - -function addprocs(np::Integer) - disable_parallel_libs() - add_workers(PGRP, start_remote_workers({ "localhost" for i=1:np }, - { worker_local_cmd() for i=1:np })) -end - -function start_scyld_workers(np::Integer) - home = JULIA_HOME - beomap_cmd = `beomap --no-local --np $np` - out,beomap_proc = readsfrom(beomap_cmd) - wait(beomap_proc) - if !success(beomap_proc) - error("node availability inaccessible (could not run beomap)") - end - nodes = split(chomp(readline(out)),':') - outs = cell(np) - for (i,node) in enumerate(nodes) - cmd = detach(`bpsh $node sh -l -c "cd $home && ./julia-release-basic --worker"`) - outs[i],_ = readsfrom(cmd) - outs[i].line_buffered = true - end - workers = cell(np) - for (i,stream) in enumerate(outs) - local hostname::String, port::Int16 - stream.line_buffered = true - while true - conninfo = readline(stream) - hostname, port = parse_connection_info(conninfo) - if hostname != "" - break - end - end - workers[i] = Worker(hostname, port) - let worker = workers[i] - # redirect console output from workers to the client's stdout: - start_reading(stream,function(stream::AsyncStream,nread::Int) - if(nread>0) - try - line = readbytes(stream.buffer, nread) - print("\tFrom worker $(worker.id):\t",line) - catch err - println(STDERR,"\tError parsing reply from worker $(worker.id):\t",err) - return false - end - end - true - end) +function launch_procs(n::Integer, config::Dict) + dir = config[:dir] + exename = config[:exename] + exeflags = config[:exeflags] + + cman = config[:cman] + if cman.ssh + sshflags = config[:sshflags] + outs=cell(n) + for i in 1:n + m = cman.machines[i] + cmd = detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename $exeflags\""`) + io,_ = readsfrom(cmd) + io.line_buffered = true + local port::Int16 + (_, port) = read_worker_host_port (io) + + # We ignore the hostname printed by the worker, since the worker may be behind a NATed interface, + # we just use the hostname specified by the caller as part of the machine name + outs[i] = (io, m, port) end + return (:io_host_port, outs) + + else + worker_local_cmd = `$(dir)/$(exename) --bind-to $bind_addr $exeflags` + return (:cmd, {worker_local_cmd for i in 1:n}) end - workers end -function addprocs_scyld(np::Integer) - disable_parallel_libs() - add_workers(PGRP, start_scyld_workers(np)) +immutable RegularCluster <: ClusterManager + launch_cb::Function + ssh::Bool + machines + + RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines) end -function start_sge_workers(n) - home = JULIA_HOME - sgedir = joinpath(pwd(),"SGE") - run(`mkdir -p $sgedir`) - qsub_cmd = `echo $home/julia-release-basic --worker` |> `qsub -N JULIA -terse -cwd -j y -o $sgedir -t 1:$n` - out,qsub_proc = readsfrom(qsub_cmd) - if !success(qsub_proc) - error("batch queue not available (could not run qsub)") - end - id = chomp(split(readline(out),'.')[1]) - println("job id is $id") - print("waiting for job to start"); - workers = cell(n) - for i=1:n - # wait for each output stream file to get created - fname = "$sgedir/JULIA.o$(id).$(i)" - local fl, hostname, port - fexists = false - sleep(0.5) - while !fexists - try - fl = open(fname) - try - conninfo = readline(fl) - hostname, port = parse_connection_info(conninfo) - finally - close(fl) - end - fexists = (hostname != "") - catch - print("."); - sleep(0.5) - end +# start and connect to processes via SSH. +# optionally through an SSH tunnel. +# the tunnel is only used from the head (process 1); the nodes are assumed +# to be mutually reachable without a tunnel, as is often the case in a cluster. +function addprocs(instances::Union(AbstractVector, Integer); + tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``, cman=nothing) + + config={:dir=>dir, :exename=>exename, :exeflags=>` --worker `, :tunnel=>tunnel, :sshflags=>sshflags} + disable_threaded_libs() + + if isa(instances, AbstractVector) && (cman == nothing) + config[:cman] = RegularCluster(ssh=true, machines=instances) + return add_workers(PGRP, start_cluster_workers(length(instances), config)) + else + if isa(cman, ClusterManager) + config[:cman] = cman + else + config[:cman] = RegularCluster() end - #print("hostname=$hostname, port=$port\n") - workers[i] = Worker(hostname, port) + return add_workers(PGRP, start_cluster_workers(instances, config)) end - print("\n") - workers end -addprocs_sge(n) = add_workers(PGRP, start_sge_workers(n)) ## higher-level functions: spawn, pmap, pfor, etc. ##