From 1d09db123b91d50b064315a0159827343eab4412 Mon Sep 17 00:00:00 2001 From: Matthew Schlegel Date: Wed, 14 Apr 2021 14:52:28 -0600 Subject: [PATCH] Added robust parallel map. --- Project.toml | 4 +++- src/parallel.jl | 54 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/Project.toml b/Project.toml index 315780e..edacd99 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Reproduce" uuid = "560a9c3a-0b8c-11e9-0329-d39dfcb85ed2" authors = ["Matt "] -version = "0.8.3" +version = "0.9.0" [deps] ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63" @@ -15,6 +15,7 @@ HDF5 = "f67ccb44-e63f-5c2f-98bd-6dc0ccc4ba2f" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +Parallelism = "c8c83da1-e5f9-4e2c-a857-b8617bac3554" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -31,6 +32,7 @@ Git = "1" HDF5 = "0.12, 0.13, 0.14, 0.15" JLD2 = "0.1, 0.2, 0.3, 0.4" JSON = "0.20, 0.21" +Parallelism = "0.1" ProgressMeter = "1" Reexport = "0.2, 1" julia = "^1.3" diff --git a/src/parallel.jl b/src/parallel.jl index 1b18488..993a5ae 100644 --- a/src/parallel.jl +++ b/src/parallel.jl @@ -5,6 +5,7 @@ using Logging using SharedArrays using JLD2 using Dates +using Parallelism # using Config include("slurm.jl") @@ -217,7 +218,8 @@ function parallel_job(experiment_file::AbstractString, ######## # job_id_channel: a job id will appear here if a job is finished. - job_id_channel = RemoteChannel(()->Channel{Int}(length(args_iter)), 1) + job_id_channel = RemoteChannel(()->Channel{Int}(min(1000, length(args_iter))), 1) + prg_channel = RemoteChannel(()->Channel{Bool}(min(1000, length(args_iter))), 1) # Include on first proc for pre-compiliation @info "pre-compile" @@ -248,27 +250,41 @@ function parallel_job(experiment_file::AbstractString, @info "Experiment built on process $(myid())" end - ProgressMeter.@showprogress pmap(args_iter) do (job_id, args) - if !checkpointing || !done_jobs[job_id] - finished = run_experiment(Main.RP_exp_func, job_id, args, extra_args, exception_dir; - expand_args=expand_args, - verbose=verbose, - store_exceptions=store_exceptions, - skip_exceptions=skip_exceptions) - if finished - Distributed.put!(job_id_channel, job_id) - end - - if checkpointing && myid() == 2 - # Deal w/ job_id_channel... - JLD2.@load checkpoint_file finished_jobs_arr - while isready(job_id_channel) - new_job_id = take!(job_id_channel) - finished_jobs_arr[new_job_id] = true + pgm = ProgressMeter.Progress(length(args_iter)) + + @sync begin + @async while Distributed.take!(prg_channel) + ProgressMeter.next!(pgm) + end + + @async begin + robust_pmap(args_iter) do (job_id, args) + if !checkpointing || !done_jobs[job_id] + finished = run_experiment(Main.RP_exp_func, job_id, args, extra_args, exception_dir; + expand_args=expand_args, + verbose=verbose, + store_exceptions=store_exceptions, + skip_exceptions=skip_exceptions) + if finished + Distributed.put!(job_id_channel, job_id) + end + + if checkpointing && myid() == 2 + # Deal w/ job_id_channel... + JLD2.@load checkpoint_file finished_jobs_arr + while isready(job_id_channel) + new_job_id = take!(job_id_channel) + finished_jobs_arr[new_job_id] = true + end + JLD2.@save checkpoint_file finished_jobs_arr + end end - JLD2.@save checkpoint_file finished_jobs_arr + Distributed.put!(prg_channel, true) + yield() end + Distributed.put!(prg_channel, false) end + end if checkpointing