Skip to content

Commit

Permalink
Merge pull request #10419 from amitmurthy/amitm/worker_init
Browse files Browse the repository at this point in the history
remove argument for option --worker , export init_worker
  • Loading branch information
amitmurthy committed Mar 9, 2015
2 parents 345f404 + c7d7a94 commit 4cdb615
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 57 deletions.
35 changes: 12 additions & 23 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ end
try_include(path::AbstractString) = isfile(path) && include(path)

# initialize the local proc network address / port
function init_bind_addr(opts::JLOptions)
function init_bind_addr()
opts = JLOptions()
if opts.bindto != C_NULL
bind_to = split(bytestring(opts.bindto), ":")
bind_addr = string(parseip(bind_to[1]))
Expand Down Expand Up @@ -248,11 +249,7 @@ let reqarg = Set(UTF8String["--home", "-H",
end
# startup worker
if Bool(opts.worker)
assert(opts.worker == 1 || opts.worker == 2)
# if default, start worker process otherwise if custom pass through
if opts.worker == 1
start_worker() # does not return
end
start_worker() # does not return
end
# load file immediately on all processors
if opts.load != C_NULL
Expand Down Expand Up @@ -345,17 +342,6 @@ function init_load_path()
push!(LOAD_PATH,abspath(JULIA_HOME,"..","share","julia","site",vers))
end

# start local process as head "master" process with process id 1
# register this process as a local worker
function init_head_sched()
# start in "head node" mode
global PGRP
global LPROC
LPROC.id = 1
assert(length(PGRP.workers) == 0)
register_worker(LPROC)
end

function load_juliarc()
# If the user built us with a specific Base.SYSCONFDIR, check that location first for a juliarc.jl file
# If it is not found, then continue on to the relative path based on JULIA_HOME
Expand Down Expand Up @@ -393,11 +379,18 @@ function early_init()
end
end

# starts the gc message task (for distrubuted gc) and
# registers worker process termination method
function init_parallel()
start_gc_msgs_task()
atexit(terminate_all_workers)

init_bind_addr()

# start in "head node" mode, if worker, will override later.
global PGRP
global LPROC
LPROC.id = 1
assert(length(PGRP.workers) == 0)
register_worker(LPROC)
end

import .Terminals
Expand All @@ -406,10 +399,6 @@ import .REPL
function _start()
opts = JLOptions()
try
init_parallel()
init_bind_addr(opts)
# if this process is not a worker, schedule head process
Bool(opts.worker) || init_head_sched()
(quiet,repl,startup,color_set,history_file) = process_options(opts,copy(ARGS))

local term
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ export
addprocs,
ClusterManager,
fetch,
init_worker,
interrupt,
isready,
launch,
Expand Down
4 changes: 2 additions & 2 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
end
exeflags = `$exeflags --worker=default`
exeflags = `$exeflags --worker`

machine_def = machine_bind[1]
machine_def = split(machine_def, ':')
Expand Down Expand Up @@ -184,7 +184,7 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi

for i in 1:manager.np
io, pobj = open(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker=default`, dir=dir)), "r")
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
Expand Down
9 changes: 9 additions & 0 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,15 @@ function init_worker(manager::ClusterManager=DefaultClusterManager())
global cluster_manager
cluster_manager = manager
disable_threaded_libs()

# Since our pid has yet to be set, ensure no RemoteRefs have been created or addprocs() called.
assert(nprocs() <= 1)
assert(isempty(PGRP.refs))
assert(isempty(client_refs))

# System is started in head node mode, cleanup entries related to the same
empty!(PGRP.workers)
empty!(map_pid_wrkr)
end


Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ function __init__()
fdwatcher_init()
early_init()
init_load_path()
init_parallel()
end

include("precompile.jl")
Expand Down
19 changes: 13 additions & 6 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,17 @@ The :func:`launch` method takes the following arguments:

The :func:`launch` method is called asynchronously in a separate task. The termination of this task
signals that all requested workers have been launched. Hence the :func:`launch` function MUST exit as soon
as all the requested workers have been launched. The Julia worker MUST be launched with a ``--worker``
argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to enable other workers
to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts.
as all the requested workers have been launched.

Newly launched workers are connected to each other, and the master process, in a all-to-all manner.
Specifying command argument, ``--worker`` results in the launched processes initializing themselves
as workers and connections being setup via TCP/IP sockets. Optionally ``--bind-to bind_addr[:port]``
may also be specified to enable other workers to connect to it at the specified ``bind_addr`` and ``port``.
This is useful for multi-homed hosts.

For non-TCP/IP transports, for example, an implementation may choose to use MPI as the transport,
``--worker`` must NOT be specified. Instead newly launched workers should call ``init_worker()``
before using any of the parallel constructs

For every worker launched, the :func:`launch` method must add a :class:`WorkerConfig`
object (with appropriate fields initialized) to ``launched`` ::
Expand Down Expand Up @@ -712,11 +719,11 @@ Note: The julia processes are still all *logically* connected to each other - an
awareness of 0MQ being used as the transport layer.

When using custom transports:
- julia workers must be started with arguments ``--worker=custom``. Just ``--worker`` or ``--worker=default`` will result in the newly launched
workers defaulting to the socket transport implementation
- julia workers must NOT be started with ``--worker``. Starting with ``--worker`` will result in the newly launched
workers defaulting to the TCP/IP socket transport implementation
- For every logical connection with a worker, ``process_messages(rd::AsyncStream, wr::AsyncStream)`` must be called.
This launches a new task that handles reading and writing of messages from/to the worker represented by the ``AsyncStream`` objects
- ``init_worker(manager::FooManager)`` must be called as part of worker process initializaton
- ``init_worker(manager::FooManager)`` MUST be called as part of worker process initializaton
- Field ``connect_at::Any`` in :class:`WorkerConfig` can be set by the cluster manager when ``launch`` is called. The value of
this field is passed in in all ``connect`` callbacks. Typically, it carries information on *how to connect* to a worker. For example,
the TCP/IP socket transport uses this field to specify the ``(host, port)`` tuple at which to connect to a worker
Expand Down
2 changes: 1 addition & 1 deletion examples/clustermanager/0mq/ZMQCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ end
function launch(manager::ZMQCMan, params::Dict, launched::Array, c::Condition)
#println("launch $(params[:np])")
for i in 1:params[:np]
io, pobj = open (`julia --worker=custom worker.jl $i`, "r")
io, pobj = open (`julia worker.jl $i`, "r")

wconfig = WorkerConfig()
wconfig.userdata = Dict(:zid=>i, :io=>io)
Expand Down
13 changes: 7 additions & 6 deletions examples/clustermanager/simple/UnixDomainCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
for i in 1:manager.np
sockname = tempname()
try
cmd = `$(params[:exename]) --worker=custom $(@__FILE__) udwrkr $sockname`
cmd = `$(params[:exename]) $(@__FILE__) udwrkr $sockname`
io, pobj = open (cmd, "r")

wconfig = WorkerConfig()
Expand Down Expand Up @@ -68,11 +68,12 @@ function start_worker(sockname)
end

function manage(manager::UnixDomainCM, id::Int, config::WorkerConfig, op)
if op == :deregister
try
rm(get(config.userdata)[:sockname])
end
end
# Does not seem to be required, filesystem entry cleanup is happening automatically on process exit
# if op == :deregister
# try
# rm(get(config.userdata)[:sockname])
# end
# end
nothing
end

Expand Down
6 changes: 0 additions & 6 deletions src/jlapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ DLLEXPORT void jl_init_with_image(const char *julia_home_dir, const char *image_
if (image_relative_path != NULL)
jl_options.image_file = image_relative_path;
julia_init(JL_IMAGE_JULIA_HOME);
//TODO: these should be part of Multi.__init__()
//currently, we have them here since we may not want them
//getting unconditionally set from Base.__init__()
jl_eval_string("Base.init_parallel()");
jl_eval_string("Base.init_bind_addr(Base.JLOptions())");
jl_eval_string("Base.init_head_sched()");
jl_exception_clear();
}

Expand Down
3 changes: 0 additions & 3 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,6 @@ extern DLLEXPORT jl_options_t jl_options;
#define JL_OPTIONS_FAST_MATH_OFF 2
#define JL_OPTIONS_FAST_MATH_DEFAULT 0

#define JL_OPTIONS_WORKER_DEFAULT 1
#define JL_OPTIONS_WORKER_CUSTOM 2

// Version information
#include "julia_version.h"

Expand Down
12 changes: 2 additions & 10 deletions ui/repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void parse_opts(int *argcp, char ***argvp)
{ "math-mode", required_argument, 0, opt_math_mode },
// hidden command line options
{ "build", required_argument, 0, 'b' },
{ "worker", optional_argument, 0, opt_worker },
{ "worker", no_argument, 0, opt_worker },
{ "bind-to", required_argument, 0, opt_bind_to },
{ "lisp", no_argument, &lisp_prompt, 1 },
{ 0, 0, 0, 0 }
Expand Down Expand Up @@ -329,15 +329,7 @@ void parse_opts(int *argcp, char ***argvp)
jl_options.image_file = NULL;
break;
case opt_worker:
if (optarg != NULL)
if (!strcmp(optarg,"default"))
jl_options.worker = JL_OPTIONS_WORKER_DEFAULT;
else if (!strcmp(optarg,"custom"))
jl_options.worker = JL_OPTIONS_WORKER_CUSTOM;
else
jl_errorf("julia: invalid argument to --worker={default|custom} (%s)\n", optarg);
else
jl_options.worker = JL_OPTIONS_WORKER_DEFAULT;
jl_options.worker = 1;
break;
case opt_bind_to:
jl_options.bindto = strdup(optarg);
Expand Down

3 comments on commit 4cdb615

@andreasnoack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitmurthy This series of commits broke MPIManager. After executing addprocs(manager) the REPL just hangs and I get something like

julia> addprocs(manager)
^CERROR: InterruptException:
 in read_worker_host_port at multi.jl:1018
 in connect at managers.jl:217
 in connect_n_create_worker at multi.jl:1202
 in start_cluster_workers at multi.jl:1112
 in addprocs at multi.jl:1067

@amitmurthy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was dependent on JuliaParallel/MPI.jl#38 and #10453, both of which have been merged. Can you try it out now?

@andreasnoack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just did, and it works. Thanks.

Please sign in to comment.