Skip to content

Commit

Permalink
Added robust parallel map.
Browse files Browse the repository at this point in the history
  • Loading branch information
mkschleg committed Apr 14, 2021
1 parent 8340c0b commit 1d09db1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Reproduce"
uuid = "560a9c3a-0b8c-11e9-0329-d39dfcb85ed2"
authors = ["Matt <[email protected]>"]
version = "0.8.3"
version = "0.9.0"

[deps]
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
Expand All @@ -15,6 +15,7 @@ HDF5 = "f67ccb44-e63f-5c2f-98bd-6dc0ccc4ba2f"
JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Parallelism = "c8c83da1-e5f9-4e2c-a857-b8617bac3554"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Expand All @@ -31,6 +32,7 @@ Git = "1"
HDF5 = "0.12, 0.13, 0.14, 0.15"
JLD2 = "0.1, 0.2, 0.3, 0.4"
JSON = "0.20, 0.21"
Parallelism = "0.1"
ProgressMeter = "1"
Reexport = "0.2, 1"
julia = "^1.3"
Expand Down
54 changes: 35 additions & 19 deletions src/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ using Logging
using SharedArrays
using JLD2
using Dates
using Parallelism
# using Config

include("slurm.jl")
Expand Down Expand Up @@ -217,7 +218,8 @@ function parallel_job(experiment_file::AbstractString,
########

# job_id_channel: a job id will appear here if a job is finished.
job_id_channel = RemoteChannel(()->Channel{Int}(length(args_iter)), 1)
job_id_channel = RemoteChannel(()->Channel{Int}(min(1000, length(args_iter))), 1)
prg_channel = RemoteChannel(()->Channel{Bool}(min(1000, length(args_iter))), 1)

# Include on first proc for pre-compiliation
@info "pre-compile"
Expand Down Expand Up @@ -248,27 +250,41 @@ function parallel_job(experiment_file::AbstractString,
@info "Experiment built on process $(myid())"
end

ProgressMeter.@showprogress pmap(args_iter) do (job_id, args)
if !checkpointing || !done_jobs[job_id]
finished = run_experiment(Main.RP_exp_func, job_id, args, extra_args, exception_dir;
expand_args=expand_args,
verbose=verbose,
store_exceptions=store_exceptions,
skip_exceptions=skip_exceptions)
if finished
Distributed.put!(job_id_channel, job_id)
end

if checkpointing && myid() == 2
# Deal w/ job_id_channel...
JLD2.@load checkpoint_file finished_jobs_arr
while isready(job_id_channel)
new_job_id = take!(job_id_channel)
finished_jobs_arr[new_job_id] = true
pgm = ProgressMeter.Progress(length(args_iter))

@sync begin
@async while Distributed.take!(prg_channel)
ProgressMeter.next!(pgm)
end

@async begin
robust_pmap(args_iter) do (job_id, args)
if !checkpointing || !done_jobs[job_id]
finished = run_experiment(Main.RP_exp_func, job_id, args, extra_args, exception_dir;
expand_args=expand_args,
verbose=verbose,
store_exceptions=store_exceptions,
skip_exceptions=skip_exceptions)
if finished
Distributed.put!(job_id_channel, job_id)
end

if checkpointing && myid() == 2
# Deal w/ job_id_channel...
JLD2.@load checkpoint_file finished_jobs_arr
while isready(job_id_channel)
new_job_id = take!(job_id_channel)
finished_jobs_arr[new_job_id] = true
end
JLD2.@save checkpoint_file finished_jobs_arr
end
end
JLD2.@save checkpoint_file finished_jobs_arr
Distributed.put!(prg_channel, true)
yield()
end
Distributed.put!(prg_channel, false)
end

end

if checkpointing
Expand Down

0 comments on commit 1d09db1

Please sign in to comment.