Skip to content

Commit 5247748

Browse files
authored
Merge pull request #100 from mkschleg/dev_slurm_task
Finished re-write of parallel.
2 parents fc6f5bb + dcc8ce0 commit 5247748

File tree

9 files changed

+584
-235
lines changed

9 files changed

+584
-235
lines changed

examples/toml_parallel.jl

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
#!/cvmfs/soft.computecanada.ca/easybuild/software/2020/avx2/Core/julia/1.8.5/bin/julia
2+
3+
#SBATCH --mail-type=ALL
4+
#SBATCH -o job_out/%x_%a.out # Standard output
5+
#SBATCH -e job_out/%x_%a.err # Standard error
6+
#SBATCH --account=def-whitem
7+
8+
19
using Pkg
210
Pkg.activate(".")
311

@@ -16,15 +24,18 @@ function main()
1624
"--numworkers"
1725
arg_type=Int
1826
default=4
27+
"--threads_per_worker"
28+
arg_type=Int
29+
default=1
1930
"--numjobs"
2031
action=:store_true
2132
end
2233
parsed = parse_args(as)
2334

24-
experiment = Reproduce.parse_experiment_from_config(parsed["config"], parsed["path"])
35+
experiment = Reproduce.parse_experiment_from_config(parsed["config"], parsed["path"]; num_workers=parsed["numworkers"], num_threads_per_worker=parsed["threads_per_worker"])
2536

2637
pre_experiment(experiment)
27-
ret = job(experiment; num_workers=parsed["numworkers"])
38+
ret = job(experiment)
2839
post_experiment(experiment, ret)
2940

3041
end

src/Reproduce.jl

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ include("save.jl")
6161

6262
include("iterators.jl")
6363

64+
include("comp_envs.jl")
65+
6466

6567
export Experiment,
6668
create_experiment_dir,

src/comp_envs.jl

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
IN_SLURM() = ("SLURM_JOBID" keys(ENV)) && ("SLURM_NTASKS" keys(ENV))
2+
3+
struct TaskJob
4+
id::Int
5+
end
6+
7+
struct LocalParallel
8+
num_tasks::Int
9+
threads_per_task::Int
10+
end
11+
12+
struct SlurmParallel
13+
num_tasks::Int
14+
threads_per_task::Int
15+
job_name::String
16+
end
17+
18+
struct SlurmTaskArray
19+
array_idx::Int
20+
task_dispatcher::Union{TaskJob, LocalParallel, SlurmParallel} # task arrays
21+
array_size::Int
22+
job_name::String
23+
end
24+
25+
get_job_name(comp_env) = "job"
26+
get_job_name(comp_env::SlurmParallel) = comp_env.job_name
27+
get_job_name(comp_env::SlurmTaskArray) = comp_env.job_name * "_$(comp_env.array_idx)"
28+
29+
function get_comp_env(; kwargs...)
30+
if IN_SLURM()
31+
get_slurm_comp_env(; kwargs...)
32+
else
33+
get_local_comp_env(; kwargs...)
34+
end
35+
end
36+
37+
function get_local_comp_env(; num_workers=Sys.CPU_THREADS - 1, threads_per_worker=1, kwargs...)
38+
39+
if "RP_TASK_ID" keys(ENV)
40+
TaskJob(parse(Int, "RP_TASK_ID"))
41+
else
42+
ntasks = parse(Int, get(ENV, "RP_NTASKS", string(num_workers)))
43+
threads_per_task = parse(Int, get(ENV, "RP_CPUS_PER_TASK", string(threads_per_worker)))
44+
LocalParallel(ntasks, threads_per_task)
45+
end
46+
47+
end
48+
49+
function get_slurm_comp_env(; kwargs...)
50+
if "SLURM_ARRAY_TASK_ID" keys(ENV)
51+
52+
array_id = parse(Int, ENV["SLURM_ARRAY_TASK_ID"])
53+
array_size = if "RP_CUSTOM_ARRAY_TASK_COUNT" keys(ENV)
54+
parse(Int, ENV["RP_CUSTOM_ARRAY_TASK_COUNT"])
55+
else
56+
parse(Int, ENV["SLURM_ARRAY_TASK_COUNT"])
57+
end
58+
ntasks = parse(Int, ENV["SLURM_NTASKS"])
59+
cpus_per_task = parse(Int, get(ENV, "SLURM_CPUS_PER_TASK", "1"))
60+
61+
prl = if ntasks == 1
62+
# check if RP_ONE_PARAM is set
63+
if "RP_TASK_ID" keys(ENV) || cpus_per_task > 1
64+
TaskJob(parse(Int, "RP_TASK_ID"))
65+
else
66+
# otherwise do local parallel (i.e. only on a signle node!).
67+
LocalParallel(ntasks, cpus_per_task)
68+
end
69+
else
70+
SlurmParallel(ntasks, cpus_per_task, ENV["SLURM_JOB_NAME"])
71+
end
72+
SlurmTaskArray(array_id, prl, array_size, ENV["SLURM_JOB_NAME"])
73+
else
74+
SlurmParallel(parse(Int, ENV["SLURM_NTASKS"]),
75+
parse(Int, get(ENV, "SLURM_CPUS_PER_TASK", "1")),
76+
ENV["SLURM_JOB_NAME"])
77+
end
78+
end

src/experiment.jl

+50-62
Original file line numberDiff line numberDiff line change
@@ -10,47 +10,7 @@ using Distributed
1010

1111

1212

13-
IN_SLURM() = ("SLURM_JOBID" keys(ENV)) && ("SLURM_NTASKS" keys(ENV))
14-
15-
function get_comp_env()
16-
if "SLURM_JOBID" keys(ENV) && "SLURM_NTASKS" keys(ENV)
17-
SlurmParallel(parse(Int, ENV["SLURM_NTASKS"]))
18-
elseif "SLURM_ARRAY_TASK_ID" keys(ENV)
19-
SlurmTaskArray(parse(Int, ENV["SLURM_ARRAY_TASK_ID"])) # this needs to be fixed.
20-
elseif "RP_TASK_ID" keys(ENV)
21-
LocalTask(parse(Int, ENV["RP_TASK_ID"]))
22-
else
23-
if "RP_NTASKS" keys(ENV)
24-
LocalParallel(parse(Int, ENV["RP_NTASKS"]))
25-
else
26-
LocalParallel(0)
27-
end
28-
end
29-
end
30-
31-
32-
get_task_id(comp_env) = comp_env.id
33-
is_task_env(comp_env) = false
34-
35-
struct SlurmTaskArray
36-
id::Int
37-
end
38-
39-
is_task_env(comp_env::SlurmTaskArray) = true
40-
41-
struct SlurmParallel
42-
num_procs::Int
43-
end
44-
45-
struct LocalTask
46-
id::Int
47-
end
4813

49-
is_task_env(comp_env::LocalTask) = true
50-
51-
struct LocalParallel
52-
num_procs::Int
53-
end
5414

5515

5616
# what does experiment do? Can it be simplified? Can parts of it be decomposed?
@@ -62,13 +22,22 @@ struct JobMetadata
6222
end
6323

6424
struct Metadata{ST, CE}
25+
name::String
6526
save_type::ST
6627
comp_env::CE
6728
details_loc::String
6829
hash::UInt64
6930
config::Union{String, Nothing}
31+
job_log_dir::String
7032
end
7133

34+
get_jobs_dir(comp_env, details_loc) = joinpath(details_loc, "jobs")#, get_job_name(comp_env))
35+
36+
function Metadata(save_type, comp_env, dir, exp_hash, config)
37+
name = get_job_name(comp_env)
38+
job_log_dir = get_jobs_dir(comp_env, dir)
39+
Metadata(name, save_type, comp_env, dir, exp_hash, config, job_log_dir)
40+
end
7241

7342
struct Experiment{MD<:Metadata, I}
7443
job_metadata::JobMetadata
@@ -116,7 +85,7 @@ This function:
11685
function pre_experiment(exp::Experiment; kwargs...)
11786
create_experiment_dir(exp.metadata.details_loc)
11887
experiment_save_init(exp.metadata.save_type, exp; kwargs...)
119-
add_experiment(exp)
88+
experiment_dir_setup(exp)
12089
end
12190

12291
"""
@@ -201,36 +170,55 @@ end
201170
get_settings_dir(details_loc) = joinpath(details_loc, "settings")
202171
get_settings_file(hash::UInt) = "settings_0x"*string(hash, base=16)*".jld2"
203172
get_config_copy_file(hash::UInt) = "config_0x"*string(hash, base=16)*".jld2"
204-
get_jobs_dir(details_loc) = joinpath(details_loc, "jobs")
205173

206-
"""
207-
add_experiment
174+
function experiment_dir_setup(exp::Experiment)
175+
experiment_dir_setup(exp.metadata.comp_env, exp)
176+
end
208177

209-
This adds the experiment to the directory (remember directories can contain multiple experiments).
210-
"""
211-
function add_experiment(exp::Experiment)
212-
213-
comp_env = exp.metadata.comp_env
214-
if is_task_env(comp_env)
215-
if get_task_id(comp_env) != 1
216-
task_id = comp_env.id
217-
@info "Only add experiment for task id == 1... id : $(task_id) $(task_id == 1)"
218-
return
219-
end
178+
function experiment_dir_setup(comp_env::LocalParallel, exp::Experiment)
179+
exp_dir = exp.metadata.details_loc
180+
create_jobs_folder(exp)
181+
save_experiment_settings(exp)
182+
end
183+
184+
function experiment_dir_setup(comp_env::SlurmParallel, exp::Experiment)
185+
exp_dir = exp.metadata.details_loc
186+
create_jobs_folder(exp)
187+
save_experiment_settings(exp)
188+
end
189+
190+
function experiment_dir_setup(comp_env::SlurmTaskArray, exp::Experiment)
191+
exp_dir = exp.metadata.details_loc
192+
create_jobs_folder(exp)
193+
array_idx = comp_env.array_idx
194+
if array_idx != 1
195+
@info "Only save settings for array index == 1: array index = $(array_idx)"
196+
return
220197
end
198+
save_experiment_settings(exp)
199+
end
221200

201+
function experiment_dir_setup(comp_env::TaskJob, exp::Experiment)
202+
task_id = comp_env.id
203+
if task_id != 1
204+
@info "Only add experiment for task id == 1... id : $(task_id) $(task_id == 1)"
205+
return
206+
end
222207
exp_dir = exp.metadata.details_loc
208+
save_experiment_settings(exp)
209+
end
210+
211+
function create_jobs_folder(exp::Experiment)
212+
_safe_mkpath(exp.metadata.job_log_dir)
213+
end
223214

224-
@info "Adding Experiment to $(exp_dir)"
215+
function save_experiment_settings(exp::Experiment)# exp_dir, exp_hash)
216+
exp_dir = exp.metadata.details_loc
217+
exp_hash = exp.metadata.hash
225218

226219
settings_dir = get_settings_dir(exp_dir)
227220
_safe_mkdir(settings_dir)
228221

229-
if comp_env isa SlurmParallel
230-
_safe_mkdir(get_jobs_dir(exp_dir))
231-
end
232-
233-
exp_hash = exp.metadata.hash
234222
settings_file = joinpath(settings_dir, "settings_0x"*string(exp_hash, base=16)*".jld2")
235223

236224
args_iter = exp.args_iter
@@ -245,7 +233,7 @@ function add_experiment(exp::Experiment)
245233
config_file = joinpath(settings_dir, "config_0x"*string(exp_hash, base=16)*splitext(config)[end])
246234
cp(config, config_file; force=true)
247235
end
248-
236+
249237
end
250238

251239
function post_experiment(exp::Experiment, job_ret)

0 commit comments

Comments
 (0)