Skip to content

Commit

Permalink
inject study_type in EBI and improvements to current automatic proces…
Browse files Browse the repository at this point in the history
…sing pipeline (#3023)

* inject study_type in ebi and improvements to current automatic proecssing pipeline

* addressing @ElDeveloper comments
  • Loading branch information
antgonza authored Aug 4, 2020
1 parent d9275b7 commit 68ffc5a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 22 deletions.
5 changes: 5 additions & 0 deletions qiita_ware/ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ def generate_study_xml(self):
study_title = ET.SubElement(descriptor, 'STUDY_TITLE')
study_title.text = escape(clean_whitespace(self.study_title))

# study type is deprecated and not displayed anywhere on EBI-ENA;
# however it's required for submission so just injecting with Other
ET.SubElement(
descriptor, 'STUDY_TYPE', {'existing_study_type': 'Other'})

study_abstract = ET.SubElement(descriptor, 'STUDY_ABSTRACT')
study_abstract.text = clean_whitespace(escape(self.study_abstract))

Expand Down
1 change: 1 addition & 0 deletions qiita_ware/test/test_ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ def test_parse_EBI_reply(self):
<STUDY_TITLE>
Identification of the Microbiomes for Cannabis Soils
</STUDY_TITLE>
<STUDY_TYPE existing_study_type="Other" />
<STUDY_ABSTRACT>
This is a preliminary study to examine the microbiota associated with \
the Cannabis plant. Soils samples from the bulk soil, soil associated with \
Expand Down
77 changes: 55 additions & 22 deletions scripts/qiita-auto-processing
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ user = User('[email protected]')
# 'version': the version of the plugin,
# 'cmd_name': the command we want to run,
# 'input_name': the name of the input parameter of that command
# 'ignore_parameters': list of parameters to ignore, for example: threads
# 'parent_artifact_name': name of the parent output, input for this command
# 'parameters_names': list of the names of the parameter sets we want to run
# }
Expand All @@ -41,21 +42,24 @@ full_pipelines = [
'data_type': ['Metagenomic'],
'artifact_type': 'per_sample_FASTQ',
'previous-step': None,
'requirements': dict(),
'steps': [
{'previous-step': None,
'plugin': 'qp-shogun',
'version': '012020',
'cmd_name': 'Atropos v1.1.24',
'input_name': 'input',
'ignore_parameters': ['Number of threads used'],
'parent_artifact_name': None,
'parameters_names': ['KAPA HyperPlus with iTru']},
{'previous-step': 'Atropos v1.1.24',
'plugin': 'qp-shogun',
'version': '012020',
'cmd_name': 'Shogun v1.0.7',
'version': '072020',
'cmd_name': 'Shogun v1.0.8',
'input_name': 'input',
'ignore_parameters': ['Number of threads'],
'parent_artifact_name': 'Adapter trimmed files',
'parameters_names': ['wol_bowtie2', 'rep94_bowtie2']}
'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
]},
{'name': 'Target Gene Processing',
'data_type': ['16S', '18S', 'ITS'],
Expand All @@ -73,6 +77,7 @@ full_pipelines = [
'version': '1.9.1',
'cmd_name': 'Trimming',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': None,
'parameters_names': ['90 base pairs',
'100 base pairs',
Expand All @@ -83,13 +88,15 @@ full_pipelines = [
'version': '1.9.1',
'cmd_name': 'Pick closed-reference OTUs',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults - parallel']},
{'previous-step': 'Trimming',
'plugin': 'deblur',
'version': '1.1.0',
'cmd_name': 'Deblur',
'input_name': 'Demultiplexed sequences',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults']}
]},
Expand Down Expand Up @@ -122,6 +129,22 @@ def _check_requirements(requirements, template):
return satisfied


def _check_parameters(jobs, cmd):
params = [{k: str(v) for k, v in j.parameters.values.items()
if k not in cmd['ignore_parameters']} for j in jobs]
return params


def _submit_workflows(artifact_process):
for artifact in artifact_process:
if artifact['workflow'] is None:
continue
# nodes will return in position [0] the first job created
first_job = list(artifact['workflow'].graph.nodes())[0]
if first_job.status == 'in_construction':
artifact['workflow'].submit()


# Step 1. Loop over the full_pipelines to process each step
for pipeline in full_pipelines:
# Step 2. From the steps generate the list of commands to add to the
Expand Down Expand Up @@ -149,6 +172,7 @@ for pipeline in full_pipelines:
'previous-step': step['previous-step'],
'parent_artifact_name': step['parent_artifact_name'],
'input_name': step['input_name'],
'ignore_parameters': step['ignore_parameters'],
'parameters': parameters})

# Step 2. - for children. Get their commands. We currently only support
Expand All @@ -161,7 +185,9 @@ for pipeline in full_pipelines:
if c['previous-step'] == commands[0]['command-name']]

# Step 3. Find all preparations/artifacts that we can add the pipeline
artifacts_all = [a for study in Study.iter()
# ... as a first pass we will only process study 10317 (AGP) ...
# artifacts_all = [a for study in Study.iter()
artifacts_all = [a for study in [Study(10317)]
# loop over all artifacts of artifact_type with in study
for a in study.artifacts(
artifact_type=pipeline['artifact_type'])
Expand All @@ -172,7 +198,10 @@ for pipeline in full_pipelines:
artifacts_compliant = []
for a in artifacts_all:
st = a.study.sample_template
pt = a.prep_templates[0]
pts = a.prep_templates
if not pts:
continue
pt = pts[0]

# {'sandbox', 'awaiting_approval', 'private', 'public'}
if a.visibility in ('sandbox', 'awaiting_approval'):
Expand All @@ -194,23 +223,29 @@ for pipeline in full_pipelines:
# of Step 4 but for debugging it makes sense to separate
artifact_process = []
children_compliant = []
cmd = commands[0]
for a in artifacts_compliant:
cmd = commands[0]
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = [j.parameters.values for j in jobs]
params = _check_parameters(jobs, cmd)

# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(a.id)})
if p not in params:
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]
if p_to_compare not in params:
missing_parameters.append(p)
else:
for c in a.children:
if c.processing_parameters.values == p:
children_compliant.append(c)
cpp = c.processing_parameters
if cpp.command.name == cmd['command-name']:
cparams = _check_parameters([cpp], cmd)
if cparams == p_to_compare:
children_compliant.append(c)
if missing_parameters:
# note that we are building a dict for each artifact so we can
# save the workflow id, useful for when we run this in a terminal
Expand All @@ -224,14 +259,18 @@ for pipeline in full_pipelines:
for cmd_id, cmd in enumerate(children_cmds):
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = [j.parameters.values for j in jobs]
params = _check_parameters(jobs, cmd)

# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(c.id)})
if p not in params:
p.update({cmd['input_name']: str(a.id)})
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]

if p_to_compare not in params:
missing_parameters.append(p)
if missing_parameters:
artifact_process.append(
Expand Down Expand Up @@ -266,9 +305,9 @@ for pipeline in full_pipelines:
# now we can add the rest of the parameters to the workflow for
# the first command
for params in artifact['missing_parameters'][1:]:
params.update({cmd['input_name']: str(a.id)})
job_params = Parameters.load(cmd['command'], values_dict=params)
artifact['workflow'].add(job_params)
artifact['workflow'].add(
job_params, req_params={cmd['input_name']: str(a.id)})

for cmd in commands[cmd_id + 1:]:
# get jobs from the workflow to which we can add this new command
Expand All @@ -286,10 +325,4 @@ for pipeline in full_pipelines:
cmd['parent_artifact_name']: cmd['input_name']}})

# Step 7. submit the workflows!
for artifact in artifact_process:
if artifact['workflow'] is None:
continue
# nodes will return in position [0] the first job created
first_job = list(artifact['workflow'].graph.nodes())[0]
if first_job.status == 'in_construction':
artifact['workflow'].submit()
_submit_workflows(artifact_process)

0 comments on commit 68ffc5a

Please sign in to comment.