-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.nf
335 lines (304 loc) · 11.6 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env nextflow
process preprocess {
container 'ghcr.io/ucl-medical-genomics/hygeia_two_group:v0.1.2'
publishDir "${params.output_dir}", mode: 'copy'
cpus 4
memory '16 GB'
input:
tuple val(case_group), val(case_ids), path(case_files), val(control_group), val(control_ids), path(control_files)
path cpg_file_path
val chrom
output:
tuple(
val(chrom),
// preprocessed data output
path("preprocessed_data/positions_${chrom}.txt"), // positions_chr
path("preprocessed_data/n_total_reads_case_${chrom}.txt"), // n_total_reads_case_chr
path("preprocessed_data/n_total_reads_control_${chrom}.txt"), // n_total_reads_control_chr
path("preprocessed_data/n_methylated_reads_case_${chrom}.txt"), // n_methylated_reads_case_chr
path("preprocessed_data/n_methylated_reads_control_${chrom}.txt"), // n_methylated_reads_control_chr
path("preprocessed_data/cpg_sites_merged_${chrom}.txt"), // cpg_sites_merged_chr
)
script:
if (params.debug) {
"""
mkdir -p preprocessed_data
touch preprocessed_data/positions_${chrom}.txt
touch preprocessed_data/n_total_reads_case_${chrom}.txt
touch preprocessed_data/n_total_reads_control_${chrom}.txt
touch preprocessed_data/n_methylated_reads_case_${chrom}.txt
touch preprocessed_data/n_methylated_reads_control_${chrom}.txt
touch preprocessed_data/cpg_sites_merged_${chrom}.txt
"""
} else {
def caseIdArgs = case_ids.collect { "--case_id_names '$it'" }.join(" ")
def caseFileArgs = case_files.collect { "--case_data_path '$it'" }.join(" ")
def controlIdArgs = control_ids.collect { "--control_id_names '$it'" }.join(" ")
def controlFileArgs = control_files.collect { "--control_data_path '$it'" }.join(" ")
"""
which hygeia
ls
pwd
hygeia preprocess \
${caseIdArgs} \
${caseFileArgs} \
${controlIdArgs} \
${controlFileArgs} \
--cpg_file_path ${cpg_file_path} \
--output_path preprocessed_data \
--chrom ${chrom}
"""
}
}
process estimateParametersAndRegimes {
container 'ghcr.io/ucl-medical-genomics/hygeia_single_group:v0.1.2'
publishDir "${params.output_dir}", mode: 'copy', pattern: 'single_group_estimation/*'
cpus 4
memory '16 GB'
input:
tuple(
val(chrom),
path(positions_chr, stageAs: "preprocessed_data/*"),
path(n_total_reads_case_chr, stageAs: "preprocessed_data/*"),
path(n_total_reads_control_chr, stageAs: "preprocessed_data/*"),
path(n_methylated_reads_case_chr, stageAs: "preprocessed_data/*"),
path(n_methylated_reads_control_chr, stageAs: "preprocessed_data/*"),
path(cpg_sites_merged_chr, stageAs: "preprocessed_data/*"),
)
output:
tuple(
val(chrom),
// preprocessed data output
path(positions_chr),
path(n_total_reads_case_chr),
path(n_total_reads_control_chr),
path(n_methylated_reads_case_chr),
path(n_methylated_reads_control_chr),
path(cpg_sites_merged_chr),
// single_group_estimation files
path("single_group_estimation/regimes_${chrom}.csv"), // regime_probabilities_csv
path("single_group_estimation/theta_trace_${chrom}.csv"), // theta_trace_csv
path("single_group_estimation/p_${chrom}.csv"), // p_csv
path("single_group_estimation/kappa_${chrom}.csv"), // kappa_csv
path("single_group_estimation/omega_${chrom}.csv"), // omega_csv
path("single_group_estimation/theta_${chrom}.csv"), // theta_csv
)
script:
if (params.debug) {
"""
mkdir -p single_group_estimation
touch single_group_estimation/regimes_${chrom}.csv
touch single_group_estimation/theta_trace_${chrom}.csv
touch single_group_estimation/p_${chrom}.csv
touch single_group_estimation/kappa_${chrom}.csv
touch single_group_estimation/omega_${chrom}.csv
touch single_group_estimation/theta_${chrom}.csv
"""
} else {
"""
hygeia estimate_parameters_and_regimes \
--mu ${params.meteor_mu} \
--sigma ${params.meteor_sigma} \
--u ${params.min_cpg_sites_between_change_points} \
--n_methylated_reads_csv_file ${n_methylated_reads_control_chr} \
--genomic_positions_csv_file ${positions_chr} \
--n_total_reads_csv_file ${n_total_reads_control_chr} \
--regime_probabilities_csv_file single_group_estimation/regimes_${chrom}.csv \
--theta_trace_csv_file single_group_estimation/theta_trace_${chrom}.csv \
--p_csv_file single_group_estimation/p_${chrom}.csv \
--kappa_csv_file single_group_estimation/kappa_${chrom}.csv \
--omega_csv_file single_group_estimation/omega_${chrom}.csv \
--theta_file single_group_estimation/theta_${chrom}.csv \
--estimate_regime_probabilities \
--estimate_parameters
"""
}
}
process infer {
container 'ghcr.io/ucl-medical-genomics/hygeia_two_group:v0.1.2'
publishDir "${params.output_dir}/two_group_output", mode: 'copy', pattern: "infer_out_${chrom}_${inference_seed}/*"
cpus 16
memory { 200.GB + (task.attempt * 50.GB ) }
input:
tuple(
val(chrom),
// preprocessed data output
path(positions_chr, stageAs: 'preprocessed_data/*'),
path(n_total_reads_case_chr, stageAs: 'preprocessed_data/*'),
path(n_total_reads_control_chr, stageAs: 'preprocessed_data/*'),
path(n_methylated_reads_case_chr, stageAs: 'preprocessed_data/*'),
path(n_methylated_reads_control_chr, stageAs: 'preprocessed_data/*'),
path(cpg_sites_merged_chr, stageAs: 'preprocessed_data/*'),
// single_group_estimation files
path(regime_probabilities_csv, stageAs: "single_group_estimation/*"),
path(theta_trace_csv, stageAs: "single_group_estimation/*"),
path(p_csv, stageAs: "single_group_estimation/*"),
path(kappa_csv, stageAs: "single_group_estimation/*"),
path(omega_csv, stageAs: "single_group_estimation/*"),
path(theta_csv, stageAs: "single_group_estimation/*"),
)
each inference_seed
output:
tuple(
val(chrom),
// preprocessed data output
path(positions_chr),
path(n_total_reads_case_chr),
path(n_total_reads_control_chr),
path(n_methylated_reads_case_chr),
path(n_methylated_reads_control_chr),
path(cpg_sites_merged_chr),
// single_group_estimation files
path(regime_probabilities_csv),
path(theta_trace_csv),
path(p_csv),
path(kappa_csv),
path(omega_csv),
path(theta_csv),
// inference output
path("infer_out_${chrom}_${inference_seed}/*"), // two_group_results
val(inference_seed)
)
script:
if (params.debug) {
"""
mkdir -p infer_out_${chrom}_${inference_seed}
touch infer_out_${chrom}_${inference_seed}/dummy_file
"""
} else {
"""
hygeia infer \
--mu ${params.meteor_mu} \
--sigma ${params.meteor_sigma} \
--chrom ${chrom} \
--single_group_dir ./single_group_estimation \
--data_dir ./preprocessed_data \
--results_dir infer_out_${chrom}_${inference_seed} \
--seed ${inference_seed}
"""
}
}
process aggregate_results {
container 'ghcr.io/ucl-medical-genomics/hygeia_two_group:v0.1.2'
publishDir "${params.output_dir}/aggregated", mode: 'copy'
cpus 8
memory '24 GB'
input:
tuple(
val(chrom),
// preprocessed data output
path(positions_chr, stageAs: 'preprocessed_data/*'),
path(n_total_reads_case_chr, stageAs: 'preprocessed_data/*'),
path(n_total_reads_control_chr, stageAs: 'preprocessed_data/*'),
path(n_methylated_reads_case_chr, stageAs: 'preprocessed_data/*'),
path(n_methylated_reads_control_chr, stageAs: 'preprocessed_data/*'),
path(cpg_sites_merged_chr, stageAs: 'preprocessed_data/*'),
// single_group_estimation files
path(regime_probabilities_csv, stageAs: "single_group_estimation/*"),
path(theta_trace_csv, stageAs: "single_group_estimation/*"),
path(p_csv, stageAs: "single_group_estimation/*"),
path(kappa_csv, stageAs: "single_group_estimation/*"),
path(omega_csv, stageAs: "single_group_estimation/*"),
path(theta_csv, stageAs: "single_group_estimation/*"),
// inference output
path("infer_out_${chrom}_${inference_seed}/*", stageAs: "out/**"), // two_group_results
val(inference_seed)
)
output:
tuple(
val(chrom),
path("aggregated_out_${chrom}/*"), // aggregated_out
)
script:
if (params.debug) {
"""
mkdir -p aggregated_out_${chrom}
touch aggregated_out_${chrom}/dummy_file
"""
} else {
"""
# Merge all the results into one folder
mkdir -p merged_out_${chrom}
for i in out/*/*; do
ln -f -s ../\$i "merged_out_${chrom}/\$(basename \$i)"
done
hygeia aggregate \
--data_dir ./preprocessed_data/ \
--results_dir merged_out_${chrom} \
--seeds ${params.num_of_inference_seeds} \
--chrom ${chrom} \
--output_dir aggregated_out_${chrom} \
--num_particles 2400
"""
}
}
process get_dmps {
container 'ghcr.io/ucl-medical-genomics/hygeia_two_group:v0.1.2'
publishDir "${params.output_dir}/dmps/", mode: 'copy'
cpus 4
memory '24 GB'
input:
tuple(
val(chrom),
path(aggregated_out_chr, stageAs: 'aggregated_data/*')
)
output:
path "dmps_${chrom}"
script:
if (params.debug) {
"""
mkdir -p dmps_${chrom}
touch dmps_${chrom}/dummy_file
"""
} else {
"""
hygeia get_dmps \
--results_dir aggregated_data \
--output_dir dmps_${chrom} \
--chrom ${chrom}
"""
}
}
workflow {
Channel
.of(params.chroms.split(','))
.set { chroms }
Channel
.of(0..params.num_of_inference_seeds - 1)
.set { inference_seeds }
Channel
.fromPath(params.sample_sheet)
.splitCsv(header: true, sep: ',', strip: true)
.map { row -> tuple(row.group, row.id, row.file) }
.groupTuple()
.collect()
.set { samples }
preprocess(samples, params.cpg_file_path, chroms)
estimateParametersAndRegimes(preprocess.out)
infer(
estimateParametersAndRegimes.out,
inference_seeds
)
infer.out
.groupTuple()
.map { r -> tuple(
r[0], // chrom
r[1].first(), // positions_chr
r[2].first(), // n_total_reads_case_chr
r[3].first(), // n_total_reads_control_chr
r[4].first(), // n_methylated_reads_case_chr
r[5].first(), // n_methylated_reads_control_chr
r[6].first(), // cpg_sites_merged_chr
r[7].first(), // regime_probabilities_csv
r[8].first(), // theta_trace_csv
r[9].first(), // p_csv
r[10].first(), // kappa_csv
r[11].first(), // omega_csv
r[12].first(), // theta_csv
r[13], // infer_out
r[14], // inference_seed
)}
.set { merged_infer_outputs }
aggregate_results(merged_infer_outputs)
get_dmps(aggregate_results.out)
}