Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

user defined transports #9434

Merged
merged 1 commit into from
Jan 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ $(build_docdir):
@mkdir -p $@/examples
@cp -R doc/devdocs doc/manual doc/stdlib $@
@cp -R examples/*.jl $@/examples/
@cp -R examples/clustermanager $@/examples/

git-submodules:
ifneq ($(NO_GIT), 1)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ Library improvements

* Split `Triangular` type into `UpperTriangular`, `LowerTriangular`, `UnitUpperTriagular` and `UnitLowerTriangular` ([#9779])

* ClusterManager - Performance improvements([#9309]) and support for changing transports([#9434])

Deprecated or removed
---------------------

Expand Down
11 changes: 9 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,15 @@ function process_options(args::Vector{UTF8String})
if args[i]=="-q" || args[i]=="--quiet"
quiet = true
elseif args[i]=="--worker"
start_worker()
# doesn't return
worker_arg = (i == length(args)) ? "" : args[i+1]

if worker_arg == "custom"
i += 1
else
start_worker()
# doesn't return
end

elseif args[i]=="--bind-to"
i+=1 # has already been processed
elseif args[i]=="-e" || args[i]=="--eval"
Expand Down
2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export
BitArray,
BitMatrix,
BitVector,
BufferStream,
CartesianIndex,
CartesianRange,
CFILE,
Expand Down Expand Up @@ -1218,6 +1219,7 @@ export
nprocs,
nworkers,
pmap,
process_messages,
procs,
put!,
remotecall,
Expand Down
9 changes: 8 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ function rmprocs(args...; waitfor = 0.0)
else
if haskey(map_pid_wrkr, i)
push!(rmprocset, i)
remote_do(i, exit)
w = map_pid_wrkr[i]
kill(w.manager, i, w.config)
end
end
end
Expand Down Expand Up @@ -1286,6 +1287,12 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
(s, s)
end

function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end

function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
config.host = rhost
Expand Down
44 changes: 44 additions & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,47 @@ mark(x::AsyncStream) = mark(x.buffer)
unmark(x::AsyncStream) = unmark(x.buffer)
reset(x::AsyncStream) = reset(x.buffer)
ismarked(x::AsyncStream) = ismarked(x.buffer)

# BufferStream's are non-OS streams, backed by a regular IOBuffer
type BufferStream <: AsyncStream
buffer::IOBuffer
r_c::Condition
close_c::Condition
is_open::Bool

BufferStream() = new(PipeBuffer(), Condition(), Condition(), true)
end

isopen(s::BufferStream) = s.is_open
close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing)

function wait_readnb(s::BufferStream, nb::Int)
while isopen(s) && nb_available(s.buffer) < nb
wait(s.r_c)
end

(nb_available(s.buffer) < nb) && error("closed BufferStream")
end

function eof(s::BufferStream)
wait_readnb(s,1)
!isopen(s) && nb_available(s.buffer)<=0
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)

nb_available(s::BufferStream) = nb_available(s.buffer)

function wait_readbyte(s::BufferStream, c::UInt8)
while isopen(s) && search(s.buffer,c) <= 0
wait(s.r_c)
end
end

wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end
start_reading(s::BufferStream) = nothing

write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv)
write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv)
write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv)

105 changes: 92 additions & 13 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -674,21 +674,46 @@ retained.
ClusterManagers
---------------

Julia worker processes can also be spawned on arbitrary machines,
enabling Julia's natural parallelism to function quite transparently
in a cluster environment. The :class:`ClusterManager` interface provides a
way to specify a means to launch and manage worker processes.
The launching, management and networking of julia processes into a logical
cluster is done via cluster managers. A ``ClusterManager`` is responsible for

Thus, a custom cluster manager would need to:
- launching worker processes in a cluster environment
- managing events during the lifetime of each worker
- optionally, a cluster manager can also provide data transport

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime
A julia cluster has the following characteristics:
- The initial julia process, also called the ``master`` is special and has a julia id of 1.
- Only the ``master`` process can add or remove worker processes.
- All processes can directly communicate with each other.

Connections between workers (using the in-built TCP/IP transport) is established in the following manner:
- ``addprocs`` is called on the master process with a ``ClusterManager`` object
- ``addprocs`` calls the appropriate ``launch`` method which spawns required
number of worker processes on appropriate machines
- Each worker starts listening on a free port and writes out its host, port information to STDOUT
- The cluster manager captures the stdout's of each worker and makes it available to the master process
- The master process parses this information and sets up TCP/IP connections to each worker
- Every worker is also notified of other workers in the cluster
- Each worker connects to all workers whose julia id is less than its own id
- In this way a mesh network is established, wherein every worker is directly connected with every other worker


While the default transport layer uses plain TCP sockets, it is possible for a julia cluster to provide
its own transport.

Julia provides two in-built cluster managers:

- :class:`LocalManager`, used when :func:`addprocs` or :func:`addprocs(::Integer) <addprocs>` are called
- :class:`SSHManager`, used when :func:`addprocs(::Array) <addprocs>` is called with a list of hostnames
- ``LocalManager``, used when :func:`addprocs` or :func:`addprocs(np::Integer) <addprocs>` are called
- ``SSHManager``, used when :func:`addprocs(hostnames::Array) <addprocs>` is called with a list of hostnames

:class:`LocalManager` is used to launch additional workers on the same host, thereby leveraging multi-core
and multi-processor hardware.

Thus, a minimal cluster manager would need to:

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime

:func:`addprocs(manager::FooManager) <addprocs>` requires ``FooManager`` to implement::

Expand Down Expand Up @@ -730,8 +755,8 @@ argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to ena
to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts.


For every worker launched, the :func:`launch` method must add a :clas`WorkerConfig`
object with appropriate fields initialized to ``launched`` ::
For every worker launched, the :func:`launch` method must add a :class:`WorkerConfig`
object (with appropriate fields initialized) to ``launched`` ::

type WorkerConfig
# Common fields relevant to all cluster managers
Expand All @@ -753,6 +778,8 @@ object with appropriate fields initialized to ``launched`` ::
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}

connect_at::Nullable{Any}

.....
end

Expand All @@ -778,7 +805,6 @@ required to connect to the workers from the master process.
``userdata`` is provided for custom cluster managers to store their own worker specific information.



:func:`manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) <manage>` is called at different
times during the worker's lifetime with different ``op`` values:

Expand All @@ -789,6 +815,59 @@ times during the worker's lifetime with different ``op`` values:
interrupt signal.
- with ``:finalize`` for cleanup purposes.


Cluster Managers with custom transports
---------------------------------------

Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved.
Each julia process has as many communication tasks as the workers it is connected to. For example, consider a julia cluster of
32 processes in a all-to-all mesh network:

- Each julia process thus has 31 communication tasks
- Each task handles all incoming messages from a single remote worker in a message processing loop
- The message processing loop waits on an ``AsyncStream`` object - for example, a TCP socket in the default implementation, reads an entire
message, processes it and waits for the next one
- Sending messages to a process is done directly from any julia task - not just communication tasks - again, via the appropriate
``AsyncStream`` object

Replacing the default transport involves the new implementation to setup connections to remote workers, and to provide appropriate
``AsyncStream`` objects that the message processing loops can wait on. The manager specific callbacks to be implemented are::

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

The default implementation (which uses TCP/IP sockets) is implemented as ``connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)``.

``connect`` should return a pair of ``AsyncStream`` objects, one for reading data sent from worker ``pid``,
and the other to write data that needs to be sent to worker ``pid``. Custom cluster managers can use an in-memory ``BufferStream``
as the plumbing to proxy data between the custom, possibly non-AsyncStream transport and julia's in-built parallel infrastructure.

A ``BufferStream`` is an in-memory ``IOBuffer`` which behaves like an ``AsyncStream``.

Folder ``examples/clustermanager/0mq`` is an example of using ZeroMQ is connect julia workers in a star network with a 0MQ broker in the middle.
Note: The julia processes are still all *logically* connected to each other - any worker can message any other worker directly without any
awareness of 0MQ being used as the transport layer.

When using custom transports:
- julia workers must be started with arguments ``--worker custom``. Just ``--worker`` will result in the newly launched
workers defaulting to the socket transport implementation
- For every logical connection with a worker, :func:`process_messages(rd::AsyncStream, wr::AsyncStream)` must be called
This launches a new task that handles reading and writing of messages from/to the worker represented by the ``AsyncStream`` objects
- :func:`init_worker(manager::FooManager)` must be called as part of worker process initializaton
- Field ``connect_at::Any`` in :class:`WorkerConfig` can be set by the cluster manager when ``launch`` is called. The value of
this field is passed in in all ``connect`` callbacks. Typically, it carries information on *how to connect* to a worker. For example,
the TCP/IP socket transport uses this field to specify the ``(host, port)`` tuple at which to connect to a worker


``kill(manager, pid, config)`` is called to remove a worker from the cluster.
On the master process, the corresponding ``AsyncStream`` objects must be closed by the implementation to ensure proper cleanup. The default
implementation simply executes an ``exit()`` call on the specified remote worker.

``examples/clustermanager/simple`` is an example that shows a simple implementation using unix domain sockets for cluster setup




.. rubric:: Footnotes

.. [#mpi2rma] In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http://www.mpi-forum.org/docs.
2 changes: 1 addition & 1 deletion doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ General Parallel Computing Support
.. function:: addprocs(n::Integer; exeflags=``) -> List of process identifiers

Launches workers using the in-built ``LocalManager`` which only launches workers on the local host.
This can be used to take advantage of multiple cores. `addprocs(4)`` will add 4 processes on the local machine.
This can be used to take advantage of multiple cores. ``addprocs(4)`` will add 4 processes on the local machine.

.. function:: addprocs() -> List of process identifiers

Expand Down
27 changes: 27 additions & 0 deletions examples/clustermanager/0mq/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
This is a proof-of-concept that uses ZeroMQ as transport.
It uses a star topology as opposed to the native mesh network.

Package ZMQ must be installed. All workers only run on localhost.

All Julia nodes only connect to a "broker" process that listens on known ports
8100 and 8101 via ZMQ sockets.


All commands must be run from `examples/clustermanager/0mq` directory

First, start the broker. In a new console type:
julia broker.jl

This does not return.

Next, start a Julia REPL and type:
include("ZMQCM.jl")
ZMQCM.start_master(4) # start with four workers


Alternatively, head.jl, a test script could be run. It just launches the requested number of workers,
executes a simple command on all of them and exits.
julia head.jl 4

NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use
different ZMQ socket types and optimize the transport.
Loading