From 862fe30feb71f0c015eb65eb39b583e4c66a5a7a Mon Sep 17 00:00:00 2001 From: Matt Cieslak Date: Thu, 1 Feb 2024 15:16:09 -0500 Subject: [PATCH] [ENH] Add ReconScalars node to recon workflows (#683) Co-authored-by: Taylor Salo --- .circleci/ScalarMapper.sh | 39 +++ .circleci/config.yml | 22 +- qsiprep/data/pipelines/bundle_scalar_map.json | 55 +++++ qsiprep/interfaces/interchange.py | 3 +- qsiprep/interfaces/recon_scalars.py | 227 +++++++++++++++++- qsiprep/interfaces/scalar_mapping.py | 171 +++++++++++++ qsiprep/workflows/recon/amico.py | 15 +- qsiprep/workflows/recon/build_workflow.py | 30 ++- qsiprep/workflows/recon/converters.py | 3 +- qsiprep/workflows/recon/dipy.py | 96 ++++++-- qsiprep/workflows/recon/dsi_studio.py | 28 ++- qsiprep/workflows/recon/mrtrix.py | 12 +- qsiprep/workflows/recon/pyafq.py | 3 +- qsiprep/workflows/recon/scalar_mapping.py | 131 ++++++++++ qsiprep/workflows/recon/tortoise.py | 3 +- 15 files changed, 786 insertions(+), 52 deletions(-) create mode 100644 .circleci/ScalarMapper.sh create mode 100644 qsiprep/data/pipelines/bundle_scalar_map.json create mode 100644 qsiprep/interfaces/scalar_mapping.py create mode 100644 qsiprep/workflows/recon/scalar_mapping.py diff --git a/.circleci/ScalarMapper.sh b/.circleci/ScalarMapper.sh new file mode 100644 index 00000000..0ab6ecdc --- /dev/null +++ b/.circleci/ScalarMapper.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +cat << DOC + +Test the TORTOISE recon workflow +================================= + +All supported reconstruction workflows get tested + + +Inputs: +------- + - qsiprep multi shell results (data/DSDTI_fmap) + +DOC +set +e +source ./get_data.sh +TESTDIR=${PWD} +TESTNAME=scalar_mapper_test +get_config_data ${TESTDIR} +get_bids_data ${TESTDIR} multishell_output +CFG=${TESTDIR}/data/nipype.cfg + +# Test MRtrix3 multishell msmt with ACT +setup_dir ${TESTDIR}/${TESTNAME} +TEMPDIR=${TESTDIR}/${TESTNAME}/work +OUTPUT_DIR=${TESTDIR}/${TESTNAME}/derivatives +BIDS_INPUT_DIR=${TESTDIR}/data/multishell_output/qsiprep +export FS_LICENSE=${TESTDIR}/data/license.txt +QSIPREP_CMD=$(run_qsiprep_cmd ${BIDS_INPUT_DIR} ${OUTPUT_DIR}) + +${QSIPREP_CMD} \ + -w ${TEMPDIR} \ + --recon-input ${BIDS_INPUT_DIR} \ + --sloppy \ + --stop-on-first-crash \ + --recon-spec bundle_scalar_map \ + --recon-only \ + -vv \ No newline at end of file diff --git a/.circleci/config.yml b/.circleci/config.yml index bfdb7837..d1bd4949 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -272,6 +272,18 @@ jobs: cd .circleci bash PyAFQReconExternalTrk.sh + Recon_ScalarMap: + <<: *dockersetup + steps: + - checkout + - run: *runinstall + - run: + name: Test scalar_mapping workflow + no_output_timeout: 1h + command: | + cd .circleci + bash ScalarMapper.sh + Recon_AMICO: <<: *dockersetup resource_class: medium+ @@ -319,7 +331,7 @@ jobs: echo "the command ``git fetch --tags --verbose`` and push" echo "them to your fork with ``git push origin --tags``" fi - sed -i -E "s/(__version__ = )'[A-Za-z0-9.-]+'/\1'${CIRCLE_TAG:-$THISVERSION}'/" wrapper/qsiprep_docker.py + sed -i -E "s/(__version__ = )'[A-Za-z0-9.-]+'/\1'${CIRCLE_TAG:-$THISVERSION}'/" wrapper/qsiprep-container/qsiprep_docker.py sed -i -E "s/(var version = )'[A-Za-z0-9.-]+'/\1'${CIRCLE_TAG:-$THISVERSION}'/" docs/citing.rst sed -i "s/title = {qsiprep}/title = {qsiprep ${CIRCLE_TAG:-$THISVERSION}}/" qsiprep/data/boilerplate.bib # Build docker image @@ -609,6 +621,13 @@ workflows: tags: only: /.*/ + - Recon_ScalarMap: + requires: + - build + filters: + tags: + only: /.*/ + - deployable: requires: - build_docs @@ -630,6 +649,7 @@ workflows: - Recon_AMICO - Recon_PYAFQ - Recon_PYAFQExternalTrk + - Recon_ScalarMap filters: branches: only: master diff --git a/qsiprep/data/pipelines/bundle_scalar_map.json b/qsiprep/data/pipelines/bundle_scalar_map.json new file mode 100644 index 00000000..2c342804 --- /dev/null +++ b/qsiprep/data/pipelines/bundle_scalar_map.json @@ -0,0 +1,55 @@ +{ + "name": "bundle_scalar_map", + "space" : "T1w", + "atlases": [ ], + "anatomical": [ ], + "nodes": [ + { + "name": "DIPYdki", + "software": "Dipy", + "action": "DKI_reconstruction", + "input": "qsiprep", + "output_suffix": "DKI", + "parameters": { + "write_mif": false, + "write_fibgz": false + } + }, + { + "name": "dsistudio_gqi", + "software": "DSI Studio", + "action": "reconstruction", + "input": "qsiprep", + "output_suffix": "gqi", + "parameters": {"method": "gqi"} + }, + { + "name": "autotrackgqi", + "software": "DSI Studio", + "action": "autotrack", + "input": "dsistudio_gqi", + "output_suffix": "AutoTrackGQI", + "parameters": { + "track_id": "Fasciculus,Cingulum,Aslant,Corticos,Thalamic_R,Reticular,Optic,Fornix,Corpus", + "tolerance": "22,26,30", + "track_voxel_ratio": 2.0, + "yield_rate": 0.000001 + } + }, + { + "name": "gqi_scalars", + "software": "DSI Studio", + "action": "export", + "input": "dsistudio_gqi", + "output_suffix": "gqiscalar" + }, + { + "name": "bundle_means", + "software": "qsiprep", + "action": "bundle_map", + "input": "autotrackgqi", + "scalars_from": ["gqi_scalars", "DIPYdki"], + "output_suffix": "bundlemap" + } + ] +} diff --git a/qsiprep/interfaces/interchange.py b/qsiprep/interfaces/interchange.py index a1c35186..3a534a05 100644 --- a/qsiprep/interfaces/interchange.py +++ b/qsiprep/interfaces/interchange.py @@ -48,6 +48,7 @@ class _ReconWorkflowInputsInputSpec(BaseInterfaceInputSpec): class _ReconWorkflowInputsOutputSpec(TraitedSpec): pass + class ReconWorkflowInputs(SimpleInterface): input_spec = _ReconWorkflowInputsInputSpec output_spec = _ReconWorkflowInputsOutputSpec @@ -83,4 +84,4 @@ def _run_interface(self, runtime): for name in anatomical_workflow_outputs: _ReconAnatomicalDataInputSpec.add_class_trait(name, traits.Any) - _ReconAnatomicalDataOutputSpec.add_class_trait(name, traits.Any) \ No newline at end of file + _ReconAnatomicalDataOutputSpec.add_class_trait(name, traits.Any) diff --git a/qsiprep/interfaces/recon_scalars.py b/qsiprep/interfaces/recon_scalars.py index cab3cb48..09ed94a1 100644 --- a/qsiprep/interfaces/recon_scalars.py +++ b/qsiprep/interfaces/recon_scalars.py @@ -2,6 +2,12 @@ # -*- coding: utf-8 -*- # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Classes that collect scalar images and metadata from Recon Workflows +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + +""" import os import os.path as op from pkg_resources import resource_filename as pkgr @@ -23,8 +29,7 @@ class ReconScalars(SimpleInterface): input_spec = ReconScalarsInputSpec output_spec = ReconScalarsOutputSpec scalar_metadata = {} - _ignore_traits = ("workflow_name") - + _ignore_traits = ("workflow_name", "scalar_metadata") def __init__(self, from_file=None, resource_monitor=None, **inputs): # Get self._results defined @@ -52,7 +57,8 @@ def _run_interface(self, runtime): continue result = self.scalar_metadata[input_name].copy() result["path"] = op.abspath(inputs[input_name]) - result["variable_name"] = self.inputs.workflow_name + "_" + input_name + result["workflow_name"] = self.inputs.workflow_name + result["variable_name"] = input_name results.append(result) self._results["scalar_info"] = results return runtime @@ -110,3 +116,218 @@ class _TORTOISEReconScalarInputSpec(ReconScalarsInputSpec): class TORTOISEReconScalars(ReconScalars): input_spec = _TORTOISEReconScalarInputSpec scalar_metadata = tortoise_scalars + + +# Scalars produced in the AMICO recon workflow +amico_scalars = { + "icvf_image": { + "desc": "Intracellular volume fraction from NODDI" + }, + "isovf_image": { + "desc": "Isotropic volume fraction from NODDI" + }, + "od_image": { + "desc": "OD from NODDI" + } +} + +class _AMICOReconScalarInputSpec(ReconScalarsInputSpec): + pass + +for input_name in amico_scalars: + _AMICOReconScalarInputSpec.add_class_trait(input_name, File(exists=True)) + +class AMICOReconScalars(ReconScalars): + input_spec = _AMICOReconScalarInputSpec + scalar_metadata = amico_scalars + + +# Scalars produced by DSI Studio +dsistudio_scalars = { + "qa_file": { + "desc": "Fractional Anisotropy from a tensor fit" + }, + "dti_fa_file": { + "desc": "Radial Diffusivity from a tensor fit" + }, + "txx_file": { + "desc": "Tensor fit txx" + }, + "txy_file": { + "desc": "Tensor fit txy" + }, + "txz_file": { + "desc": "Tensor fit txz" + }, + "tyy_file": { + "desc": "Tensor fit tyy" + }, + "tyz_file": { + "desc": "Tensor fit tyz" + }, + "tzz_file": { + "desc": "Tensor fit tzz" + }, + "rd1_file": { + "desc": "RD1" + }, + "rd2_file": { + "desc": "RD2" + }, + "ha_file": { + "desc": "HA" + }, + "md_file": { + "desc": "Mean Diffusivity" + }, + "ad_file": { + "desc": "AD" + }, + "rd_file": { + "desc": "Radial Diffusivity" + }, + "gfa_file": { + "desc": "Generalized Fractional Anisotropy" + }, + "iso_file": { + "desc": "Isotropic Diffusion" + }, + "rdi_file": { + "desc": "RDI" + }, + "nrdi02L_file": { + "desc": "NRDI at 02L" + }, + "nrdi04L_file": { + "desc": "NRDI at 04L" + }, + "nrdi06L_file": { + "desc": "NRDI at 06L" + }, +} + +class _DSIStudioReconScalarInputSpec(ReconScalarsInputSpec): + pass + +for input_name in dsistudio_scalars: + _DSIStudioReconScalarInputSpec.add_class_trait(input_name, File(exists=True)) + +class DSIStudioReconScalars(ReconScalars): + input_spec = _DSIStudioReconScalarInputSpec + scalar_metadata = dsistudio_scalars + + +dipy_dki_scalars = { + 'dki_fa': { + "desc": "DKI FA" + }, + 'dki_md': { + "desc": "DKI MD" + }, + 'dki_rd': { + "desc": "DKI RD" + }, + 'dki_ad': { + "desc": "DKI AD" + }, + 'dki_kfa': { + "desc": "DKI KFA" + }, + 'dki_mk': { + "desc": "DKI MK" + }, + 'dki_ak': { + "desc": "DKI AK" + }, + 'dki_rk': { + "desc": "DKI RK" + }, + 'dki_mkt': { + "desc": "DKI MKT" + } +} +class _DIPYDKIReconScalarInputSpec(ReconScalarsInputSpec): + pass + +for input_name in dipy_dki_scalars: + _DIPYDKIReconScalarInputSpec.add_class_trait(input_name, File(exists=True)) + +class DIPYDKIReconScalars(ReconScalars): + input_spec = _DIPYDKIReconScalarInputSpec + scalar_metadata = dipy_dki_scalars + + +# DIPY implementation of MAPMRI +dipy_mapmri_scalars = { + "qiv_file": { + "desc": "q-space inverse variance from MAPMRI" + }, + "msd_file": { + "desc": "mean square displacement from MAPMRI" + }, + "lapnorm_file": { + "desc": "Laplacian norm from regularized MAPMRI (MAPL)" + }, + "rtop_file": { + "desc": "Return to origin probability from MAPMRI" + }, + "rtap_file": { + "desc": "Return to axis probability from MAPMRI" + }, + "rtpp_file": { + "desc": "Return to plane probability from MAPMRI" + }, + "ng_file": { + "desc": "Non-Gaussianity from MAPMRI" + }, + "ngpar_file": { + "desc": "Non-Gaussianity parallel from MAPMRI" + }, + "ngperp_file": { + "desc": "Non-Gaussianity perpendicular from MAPMRI" + }, +} + + +class _DIPYMAPMRIReconScalarInputSpec(ReconScalarsInputSpec): + pass + + +for input_name in dipy_mapmri_scalars: + _DIPYMAPMRIReconScalarInputSpec.add_class_trait(input_name, File(exists=True)) + + +class DIPYMAPMRIReconScalars(ReconScalars): + input_spec = _DIPYMAPMRIReconScalarInputSpec + scalar_metadata = dipy_mapmri_scalars + + +# Same as DIPY implementation of 3dSHORE, but with brainsuite bases +brainsuite_3dshore_scalars = dipy_mapmri_scalars.copy() +brainsuite_3dshore_scalars.update({ + "cnr_image": { + "desc": "Contrast to noise ratio for 3dSHORE fit" + }, + "alpha_image": { + "desc": "alpha used when fitting in each voxel" + }, + "r2_image": { + "desc": "r^2 of the 3dSHORE fit" + }, + "regularization_image": { + "desc": "regularization of the 3dSHORE fit" + }, +}) + + +class _BrainSuite3dSHOREReconScalarInputSpec(ReconScalarsInputSpec): + pass + + +for input_name in brainsuite_3dshore_scalars: + _BrainSuite3dSHOREReconScalarInputSpec.add_class_trait(input_name, File(exists=True)) + + +class BrainSuite3dSHOREReconScalars(ReconScalars): + input_spec = _BrainSuite3dSHOREReconScalarInputSpec + scalar_metadata = brainsuite_3dshore_scalars \ No newline at end of file diff --git a/qsiprep/interfaces/scalar_mapping.py b/qsiprep/interfaces/scalar_mapping.py new file mode 100644 index 00000000..2119a909 --- /dev/null +++ b/qsiprep/interfaces/scalar_mapping.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +import os +import os.path as op +import subprocess +from pkg_resources import resource_filename as pkgr +import nibabel as nb +import numpy as np +from nipype import logging +from nipype.utils.filemanip import fname_presuffix +from nipype.interfaces.base import ( + traits, TraitedSpec, BaseInterfaceInputSpec, File, SimpleInterface, isdefined, + InputMultiObject +) +import nilearn.image as nim +from nilearn.maskers import NiftiMasker +import pandas as pd +from .bids import get_bids_params + +class ScalarMapperInputSpec(BaseInterfaceInputSpec): + scalars_from = InputMultiObject(traits.Str()) + recon_scalars = InputMultiObject(traits.Any()) + dwiref_image = File(exists=True) + + +class ScalarMapperOutputSpec(TraitedSpec): + mapped_scalars = traits.List(File(exists=True)) + + +class ScalarMapper(SimpleInterface): + input_spec = ScalarMapperInputSpec + output_spec = ScalarMapperOutputSpec + + def _load_scalars(self): + self.recon_scalars = self.inputs.recon_scalars + for scalar_obj in self.recon_scalars: + scalar_obj["image"] = nim.load_img(scalar_obj["path"]) + + def _update_with_bids_info(self, summary_row_list): + # Add BIDS info to the summarized scalars + bids_info = get_bids_params(self.inputs.dwiref_image) + for summary_row in summary_row_list: + summary_row.update(bids_info) + + def _run_interface(self, runtime): + self._load_scalars() + self._do_mapping(runtime) + return runtime + + +# For mapping to bundles +class _BundleMapperInputSpec(ScalarMapperInputSpec): + tck_files = InputMultiObject( + File(exists=True), + desc="Paths to tck files") + bundle_names = InputMultiObject(traits.Str()) + + +class _BundleMapperOutputSpec(ScalarMapperOutputSpec): + bundle_summary = File(exists=True) + bundle_profiles = File(exists=True) + + +def _get_tdi_img(dwiref_image, tck_file, output_tdi_file): + output = subprocess.run( + ["tckmap", "-template", dwiref_image, "-contrast", "tdi", "-force", + tck_file, output_tdi_file], + check=True) + return nim.load_img(output_tdi_file) + +class BundleMapper(ScalarMapper): + input_spec = _BundleMapperInputSpec + output_spec = _BundleMapperOutputSpec + + def _do_mapping(self, runtime): + bundle_dfs = [] + for tck_name, tck_file in zip(self.inputs.bundle_names, self.inputs.tck_files): + output_tdi_file = fname_presuffix( + tck_file, + suffix="_tdi.nii", + newpath=runtime.cwd, + use_ext=False) + + # Create a TDI, where streamline count is mapped to voxels + tdi_img = _get_tdi_img( + self.inputs.dwiref_image, + tck_file, + output_tdi_file) + + # Create a Masker from all voxels containing streamlines + msk_img = nim.math_img("a>0", a=tdi_img) + bundle_masker = NiftiMasker(msk_img) + + # Get a weighting vector from the TDI + tdi_weights = bundle_masker.fit_transform(tdi_img).squeeze() + tdi_weights = tdi_weights / tdi_weights.sum() + + # Start gathering stats with the TDI first + bundle_dfs.append( + calculate_mask_stats( + bundle_masker, + tck_name, + "bundle", + {"image": tdi_img, + "variable_name": "tdi", + "workflow_name": "scalar_to_bundle", + "desc": "Streamline counts per voxel"}) + ) + + # Then get the same stats for the scalars + for recon_scalar in self.recon_scalars: + bundle_dfs.append( + calculate_mask_stats(bundle_masker, tck_name, "bundle", + recon_scalar, tdi_weights)) + self._update_with_bids_info(bundle_dfs) + summary_file = op.join(runtime.cwd, "bundle_stats.tsv") + summary_df = pd.DataFrame(bundle_dfs) + + summary_df.to_csv(summary_file, index=False, sep="\t") + self._results['bundle_summary'] = summary_file + + +# For mapping to atlases +class _AtlasMapperInputSpec(ScalarMapperInputSpec): + atlas_configs = traits.Any() + + +class _AtlasMapperOutputSpec(ScalarMapperOutputSpec): + region_stats = File(exists=True, mandatory=True) + + +class AtlasMapper(ScalarMapper): + input_spec = _AtlasMapperInputSpec + output_spec = _AtlasMapperOutputSpec + +def calculate_mask_stats(masker, mask_name, mask_variable_name, recon_scalar, weighting_vector=None): + + # Get the scalar data in the masked region + voxel_data = masker.fit_transform(recon_scalar["image"]).squeeze() + # Find out how much of this scalar is finite + nz_voxel_data = voxel_data.copy() + nz_voxel_data[nz_voxel_data == 0] = np.nan + nz_voxel_data[~np.isfinite(voxel_data)] = np.nan + + # Make a prettier variable name + variable_name = recon_scalar["variable_name"].replace("_image", "").replace("_file", "") + + results = { + mask_variable_name: mask_name, + "variable_name": variable_name, + "workflow": recon_scalar["workflow_name"], + "zero_proportion": np.sum(np.isnan(nz_voxel_data)) / voxel_data.shape[0], + "mean": np.mean(voxel_data), + "stdev": np.std(voxel_data), + "median": np.median(voxel_data), + "masked_mean": np.nanmean(nz_voxel_data), + "masked_median": np.nanmedian(nz_voxel_data), + "masked_stdev": np.nanstd(nz_voxel_data) + } + + if weighting_vector is not None: + results["weighted_mean"] = np.sum(voxel_data * weighting_vector) + nz_weighting_vector = weighting_vector.copy() + nz_weighting_vector[np.isnan(nz_voxel_data)] = np.nan + nz_weighting_vector = nz_weighting_vector / np.nansum(nz_weighting_vector) + results["masked_weighted_mean"] = np.nansum( + nz_voxel_data * nz_weighting_vector ) + + return results \ No newline at end of file diff --git a/qsiprep/workflows/recon/amico.py b/qsiprep/workflows/recon/amico.py index e4b5e001..013a5b0b 100644 --- a/qsiprep/workflows/recon/amico.py +++ b/qsiprep/workflows/recon/amico.py @@ -14,6 +14,7 @@ from ...interfaces.amico import NODDI from ...interfaces.reports import CLIReconPeaksReport from ...interfaces.converters import NODDItoFIBGZ +from ...interfaces.recon_scalars import AMICOReconScalars LOGGER = logging.getLogger('nipype.interface') @@ -48,10 +49,13 @@ def init_amico_noddi_fit_wf(omp_nthreads, available_anatomical_data, outputnode = pe.Node( niu.IdentityInterface( fields=['directions_image', 'icvf_image', 'od_image', - 'isovf_image', 'config_file', 'fibgz']), + 'isovf_image', 'config_file', 'fibgz', 'recon_scalars']), name="outputnode") workflow = Workflow(name=name) + recon_scalars = pe.Node(AMICOReconScalars(workflow_name=name), + name="recon_scalars", + run_without_submitting=True) plot_reports = params.pop("plot_reports", True) desc = """NODDI Reconstruction @@ -81,12 +85,17 @@ def init_amico_noddi_fit_wf(omp_nthreads, available_anatomical_data, ('isovf_image', 'isovf_image'), ('config_file', 'config_file'), ]), + (noddi_fit, recon_scalars, [ + ('icvf_image', 'icvf_image'), + ('od_image', 'od_image'), + ('isovf_image', 'isovf_image'), + ]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")]), (noddi_fit, convert_to_fibgz, [ ('directions_image', 'directions_file'), ('icvf_image', 'icvf_file'), ('od_image', 'od_file'), - ('isovf_image', 'isovf_file'), - ]), + ('isovf_image', 'isovf_file')]), (inputnode, convert_to_fibgz, [('dwi_mask', 'mask_file')]), (convert_to_fibgz, outputnode, [('fibgz_file', 'fibgz')])]) if plot_reports: diff --git a/qsiprep/workflows/recon/build_workflow.py b/qsiprep/workflows/recon/build_workflow.py index 2efc92ce..9854bf16 100644 --- a/qsiprep/workflows/recon/build_workflow.py +++ b/qsiprep/workflows/recon/build_workflow.py @@ -18,8 +18,11 @@ from .dynamics import init_controllability_wf from .utils import init_conform_dwi_wf, init_discard_repeated_samples_wf from .steinhardt import init_steinhardt_order_param_wf +from .scalar_mapping import init_scalar_to_bundle_wf from ...engine import Workflow -from ...interfaces.interchange import (default_input_set, recon_workflow_input_fields) +from ...interfaces.interchange import ( + default_input_set, recon_workflow_input_fields +) LOGGER = logging.getLogger('nipype.interface') @@ -41,7 +44,6 @@ def init_dwi_recon_workflow(workflow_spec, output_dir, inputnode = pe.Node( niu.IdentityInterface(fields=recon_workflow_input_fields), name='inputnode') - # Read nodes from workflow spec, make sure we can implement them nodes_to_add = [] for node_spec in workflow_spec['nodes']: @@ -58,13 +60,22 @@ def init_dwi_recon_workflow(workflow_spec, output_dir, workflow.add_nodes(nodes_to_add) _check_repeats(workflow.list_node_names()) + # Create a node that gathers scalar outputs from those that produce them + scalar_gatherer = pe.Node(niu.Merge(len(nodes_to_add)), name="scalar_gatherer") + # Now that all nodes are in the workflow, connect them - for node_spec in workflow_spec['nodes']: + for node_num, node_spec in enumerate(workflow_spec['nodes'], start=1): # get the nipype node object node_name = node_spec['name'] node = workflow.get_node(node_name) + consuming_scalars = node_spec.get("scalars_from", []) + if consuming_scalars: + workflow.connect(scalar_gatherer, "out", node, "inputnode.collected_scalars") + else: + workflow.connect(node, "outputnode.recon_scalars", scalar_gatherer, "in%d"%node_num) + if node_spec.get('input', 'qsiprep') == 'qsiprep': # directly connect all the qsiprep outputs to every node workflow.connect([ @@ -127,6 +138,15 @@ def workflow_from_spec(omp_nthreads, available_anatomical_data, node_spec, output_suffix = node_spec.get("output_suffix", "") node_name = node_spec.get("name", None) parameters = node_spec.get("parameters", {}) + + # It makes more sense intuitively to have scalars_from in the + # root of a recon spec "node". But to pass it to the workflow + # it needs to go in parameters + if "scalars_from" in node_spec and node_spec["scalars_from"]: + if parameters.get("scalars_from"): + LOGGER.warning("overwriting scalars_from in parameters") + parameters["scalars_from"] = node_spec["scalars_from"] + if skip_odf_plots: LOGGER.info("skipping ODF plots for %s", node_name) parameters['plot_reports'] = False @@ -141,6 +161,7 @@ def workflow_from_spec(omp_nthreads, available_anatomical_data, node_spec, "params": parameters} + # DSI Studio operations if software == "DSI Studio": if node_spec["action"] == "reconstruction": @@ -202,6 +223,8 @@ def workflow_from_spec(omp_nthreads, available_anatomical_data, node_spec, return init_qsiprep_to_fsl_wf(**kwargs) if node_spec['action'] == 'steinhardt_order_parameters': return init_steinhardt_order_param_wf(**kwargs) + if node_spec['action'] == 'bundle_map': + return init_scalar_to_bundle_wf(**kwargs) raise Exception("Unknown node %s" % node_spec) @@ -209,3 +232,4 @@ def workflow_from_spec(omp_nthreads, available_anatomical_data, node_spec, def _as_connections(attr_list, src_prefix='', dest_prefix=''): return [(src_prefix + item, dest_prefix + item) for item in attr_list] + diff --git a/qsiprep/workflows/recon/converters.py b/qsiprep/workflows/recon/converters.py index bb9e761f..d37b8579 100644 --- a/qsiprep/workflows/recon/converters.py +++ b/qsiprep/workflows/recon/converters.py @@ -69,12 +69,13 @@ def init_qsiprep_to_fsl_wf(omp_nthreads, available_anatomical_data, """Converts QSIPrep outputs (images, bval, bvec) to fsl standard orientation""" inputnode = pe.Node(niu.IdentityInterface(fields=recon_workflow_input_fields), name="inputnode") - to_reorient = ["mask_file", "dwi_file", "bval_file", "bvec_file"] + to_reorient = ["mask_file", "dwi_file", "bval_file", "bvec_file", "recon_scalars"] outputnode = pe.Node( niu.IdentityInterface( fields=to_reorient), name="outputnode") workflow = Workflow(name=name) + outputnode.inputs.recon_scalars = [] convert_dwi_to_fsl = pe.Node( ConformDwi(orientation="LAS"), name="convert_to_fsl") diff --git a/qsiprep/workflows/recon/dipy.py b/qsiprep/workflows/recon/dipy.py index 04029556..8cff5f44 100644 --- a/qsiprep/workflows/recon/dipy.py +++ b/qsiprep/workflows/recon/dipy.py @@ -13,6 +13,9 @@ from ...interfaces.interchange import recon_workflow_input_fields from ...engine import Workflow from ...interfaces.reports import CLIReconPeaksReport +from ...interfaces.recon_scalars import ( + DIPYDKIReconScalars, DIPYMAPMRIReconScalars, BrainSuite3dSHOREReconScalars) + LOGGER = logging.getLogger('nipype.interface') @@ -100,7 +103,7 @@ def init_dipy_brainsuite_shore_recon_wf(omp_nthreads, available_anatomical_data, niu.IdentityInterface( fields=['shore_coeffs_image', 'rtop_image', 'alpha_image', 'r2_image', 'cnr_image', 'regularization_image', 'fibgz', 'fod_sh_mif', - 'dwi_file', 'bval_file', 'bvec_file', 'b_file']), + 'dwi_file', 'bval_file', 'bvec_file', 'b_file', 'recon_scalars']), name="outputnode") workflow = Workflow(name=name) @@ -108,7 +111,12 @@ def init_dipy_brainsuite_shore_recon_wf(omp_nthreads, available_anatomical_data, : """ plot_reports = params.pop("plot_reports", True) - recon_shore = pe.Node(BrainSuiteShoreReconstruction(**params), name="recon_shore") + recon_shore = pe.Node( + BrainSuiteShoreReconstruction(**params), name="recon_shore") + recon_scalars = pe.Node( + BrainSuite3dSHOREReconScalars(workflow_name="name"), + name="recon_scalars", + run_without_submitting=True) doing_extrapolation = params.get("extrapolate_scheme") in ("HCP", "ABCD") workflow.connect([ @@ -127,8 +135,16 @@ def init_dipy_brainsuite_shore_recon_wf(omp_nthreads, available_anatomical_data, ('extrapolated_dwi', 'dwi_file'), ('extrapolated_bvals', 'bval_file'), ('extrapolated_bvecs', 'bvec_file'), - ('extrapolated_b', 'b_file')]) - ]) + ('extrapolated_b', 'b_file')]), + (recon_shore, recon_scalars, [ + ('rtop_image', 'rtop_file'), + ('alpha_image', 'alpha_image'), + ('r2_image', 'r2_image'), + ('cnr_image', 'cnr_image'), + ('regularization_image', 'regularization_image')]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")]) + ]) + if plot_reports: plot_peaks = pe.Node( @@ -330,7 +346,7 @@ def init_dipy_mapmri_recon_wf(omp_nthreads, available_anatomical_data, name="dip outputnode = pe.Node( niu.IdentityInterface( fields=['mapmri_coeffs', 'rtop', 'rtap', 'rtpp', 'fibgz', 'fod_sh_mif', - 'parng', 'perng', 'ng', 'qiv', 'lapnorm', 'msd']), + 'parng', 'perng', 'ng', 'qiv', 'lapnorm', 'msd', 'recon_scalars']), name="outputnode") workflow = Workflow(name=name) @@ -339,7 +355,10 @@ def init_dipy_mapmri_recon_wf(omp_nthreads, available_anatomical_data, name="dip : """ plot_reports = params.pop("plot_reports", True) recon_map = pe.Node(MAPMRIReconstruction(**params), name="recon_map") - + recon_scalars = pe.Node( + DIPYMAPMRIReconScalars(workflow_name=name), + name="recon_scalars", + run_without_submitting=True) workflow.connect([ (inputnode, recon_map, [('dwi_file', 'dwi_file'), ('bval_file', 'bval_file'), @@ -356,7 +375,18 @@ def init_dipy_mapmri_recon_wf(omp_nthreads, available_anatomical_data, name="dip ('qiv', 'qiv'), ('lapnorm', 'lapnorm'), ('fibgz', 'fibgz'), - ('fod_sh_mif', 'fod_sh_mif')])]) + ('fod_sh_mif', 'fod_sh_mif')]), + (recon_map, recon_scalars, [ + ('rtop', 'rtop_file'), + ('rtap', 'rtap_file'), + ('rtpp', 'rtpp_file'), + ('ng', 'ng_file'), + ('parng', 'ngpar_file'), + ('perng', 'ngperp_file'), + ('msd', 'msd_file'), + ('qiv', 'qiv_file'), + ('lapnorm', 'lapnorm_file')]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")])]) if plot_reports: plot_peaks = pe.Node( CLIReconPeaksReport(), @@ -440,9 +470,11 @@ def init_dipy_dki_recon_wf(omp_nthreads, available_anatomical_data, name="dipy_d niu.IdentityInterface( fields=['tensor', 'fa', 'md', 'rd', 'ad', 'colorFA', 'kfa', 'mk', 'ak', 'rk', - 'mkt']), + 'mkt', 'recon_scalars']), name="outputnode") - + recon_scalars = pe.Node(DIPYDKIReconScalars(workflow_name=name), + run_without_submitting=True, + name="recon_scalars") workflow = Workflow(name=name) desc = """Dipy Reconstruction @@ -450,24 +482,36 @@ def init_dipy_dki_recon_wf(omp_nthreads, available_anatomical_data, name="dipy_d plot_reports = params.pop("plot_reports", True) recon_dki = pe.Node(KurtosisReconstruction(**params), name="recon_dki") - workflow.connect([ - (inputnode, recon_dki, [('dwi_file', 'dwi_file'), - ('bval_file', 'bval_file'), - ('bvec_file', 'bvec_file'), - ('dwi_mask', 'mask_file')]), - (recon_dki, outputnode, [('tensor', 'tensor'), - ('fa', 'fa'), - ('md', 'md'), - ('rd', 'rd'), - ('ad', 'ad'), - ('colorFA', 'colorFA'), - ('kfa', 'kfa'), - ('mk', 'mk'), - ('ak', 'ak'), - ('rk', 'rk'), - ('mkt', 'mkt'), - ('fibgz', 'fibgz')]) + (inputnode, recon_dki, [ + ('dwi_file', 'dwi_file'), + ('bval_file', 'bval_file'), + ('bvec_file', 'bvec_file'), + ('dwi_mask', 'mask_file')]), + (recon_dki, outputnode, [ + ('tensor', 'tensor'), + ('fa', 'fa'), + ('md', 'md'), + ('rd', 'rd'), + ('ad', 'ad'), + ('colorFA', 'colorFA'), + ('kfa', 'kfa'), + ('mk', 'mk'), + ('ak', 'ak'), + ('rk', 'rk'), + ('mkt', 'mkt'), + ('fibgz', 'fibgz')]), + (recon_dki, recon_scalars, [ + ('fa', 'dki_fa'), + ('md', 'dki_md'), + ('rd', 'dki_rd'), + ('ad', 'dki_ad'), + ('kfa', 'dki_kfa'), + ('mk', 'dki_mk'), + ('ak', 'dki_ak'), + ('rk', 'dki_rk'), + ('mkt', 'dki_mkt')]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")]) ]) if plot_reports and False: diff --git a/qsiprep/workflows/recon/dsi_studio.py b/qsiprep/workflows/recon/dsi_studio.py index 44b68759..0331b5f9 100644 --- a/qsiprep/workflows/recon/dsi_studio.py +++ b/qsiprep/workflows/recon/dsi_studio.py @@ -17,6 +17,7 @@ import logging from ...interfaces.bids import ReconDerivativesDataSink +from ...interfaces.recon_scalars import DSIStudioReconScalars from ...interfaces.converters import DSIStudioTrkToTck from ...interfaces.interchange import recon_workflow_input_fields from ...engine import Workflow @@ -51,9 +52,10 @@ def init_dsi_studio_recon_wf(omp_nthreads, available_anatomical_data, name="dsi_ name="inputnode") outputnode = pe.Node( niu.IdentityInterface( - fields=['fibgz']), + fields=['fibgz', 'recon_scalars']), name="outputnode") workflow = Workflow(name=name) + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) desc = """DSI Studio Reconstruction @@ -186,8 +188,9 @@ def init_dsi_studio_tractography_wf(omp_nthreads, available_anatomical_data, nam niu.IdentityInterface( fields=recon_workflow_input_fields + ['fibgz']), name="inputnode") - outputnode = pe.Node(niu.IdentityInterface(fields=['trk_file', 'fibgz']), + outputnode = pe.Node(niu.IdentityInterface(fields=['trk_file', 'fibgz', 'recon_scalars']), name="outputnode") + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) workflow = Workflow(name=name) desc = """DSI Studio Tractography @@ -264,8 +267,9 @@ def init_dsi_studio_autotrack_wf(omp_nthreads, available_anatomical_data, niu.IdentityInterface( fields=recon_workflow_input_fields + ['fibgz']), name="inputnode") - outputnode = pe.Node(niu.IdentityInterface(fields=['tck_files', "bundle_names"]), + outputnode = pe.Node(niu.IdentityInterface(fields=['tck_files', "bundle_names", "recon_scalars"]), name="outputnode") + outputnode.inputs.recon_scalars = [] desc = """DSI Studio Automatic Tractography : Automatic Tractography was run in DSI Studio (version %s) and @@ -398,10 +402,11 @@ def init_dsi_studio_connectivity_wf(omp_nthreads, available_anatomical_data, nam """ inputnode = pe.Node( niu.IdentityInterface( - fields=recon_workflow_input_fields + ['fibgz', 'trk_file', 'atlas_configs']), + fields=recon_workflow_input_fields + ['fibgz', 'trk_file', 'atlas_configs', 'recon_scalars']), name="inputnode") - outputnode = pe.Node(niu.IdentityInterface(fields=['matfile']), + outputnode = pe.Node(niu.IdentityInterface(fields=['matfile', "recon_scalars"]), name="outputnode") + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) workflow = pe.Workflow(name=name) calc_connectivity = pe.Node( @@ -471,10 +476,14 @@ def init_dsi_studio_export_wf(omp_nthreads, available_anatomical_data, name="dsi "rd2", "ha", "md", "ad", "rd", "gfa", "iso", "rdi", "nrdi02L", "nrdi04L", "nrdi06L"] outputnode = pe.Node( - niu.IdentityInterface(fields=[name + "_file" for name in scalar_names]), + niu.IdentityInterface(fields=[name + "_file" for name in scalar_names] + ["recon_scalars"]), name="outputnode") workflow = pe.Workflow(name=name) export = pe.Node(DSIStudioExport(to_export=",".join(scalar_names)), name='export') + recon_scalars = pe.Node( + DSIStudioReconScalars(workflow_name=name), + name="recon_scalars", + n_procs=1) fixhdr_nodes = {} for scalar_name in scalar_names: output_name = scalar_name + '_file' @@ -482,7 +491,8 @@ def init_dsi_studio_export_wf(omp_nthreads, available_anatomical_data, name="dsi connections = [(export, fixhdr_nodes[scalar_name], [(output_name, 'dsi_studio_nifti')]), (inputnode, fixhdr_nodes[scalar_name], [('dwi_file', 'correct_header_nifti')]), - (fixhdr_nodes[scalar_name], outputnode, [('out_file', scalar_name)])] + (fixhdr_nodes[scalar_name], outputnode, [('out_file', output_name)]), + (fixhdr_nodes[scalar_name], recon_scalars, [("out_file", output_name)])] if output_suffix: connections += [(fixhdr_nodes[scalar_name], pe.Node( @@ -492,6 +502,8 @@ def init_dsi_studio_export_wf(omp_nthreads, available_anatomical_data, name="dsi [('out_file', 'in_file')])] workflow.connect(connections) - workflow.connect([(inputnode, export, [('fibgz', 'input_file')])]) + workflow.connect([ + (inputnode, export, [('fibgz', 'input_file')]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")])]) return workflow diff --git a/qsiprep/workflows/recon/mrtrix.py b/qsiprep/workflows/recon/mrtrix.py index ffe357db..78ddce8d 100644 --- a/qsiprep/workflows/recon/mrtrix.py +++ b/qsiprep/workflows/recon/mrtrix.py @@ -88,9 +88,10 @@ def init_mrtrix_csd_recon_wf(omp_nthreads, available_anatomical_data, name="mrtr outputnode = pe.Node( niu.IdentityInterface( fields=['fod_sh_mif', 'wm_odf', 'wm_txt', 'gm_odf', 'gm_txt', 'csf_odf', - 'csf_txt']), + 'csf_txt', 'recon_scalars']), name="outputnode") workflow = Workflow(name=name) + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) desc = """MRtrix3 Reconstruction @@ -346,10 +347,11 @@ def init_global_tractography_wf(omp_nthreads, available_anatomical_data, name="m name="inputnode") outputnode = pe.Node( niu.IdentityInterface( - fields=['fod_sh_mif', 'wm_odf', 'iso_fraction', 'tck_file']), + fields=['fod_sh_mif', 'wm_odf', 'iso_fraction', 'tck_file', 'recon_scalars']), name="outputnode") workflow = pe.Workflow(name=name) + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) create_mif = pe.Node(MRTrixIngress(), name='create_mif') @@ -439,10 +441,11 @@ def init_mrtrix_tractography_wf(omp_nthreads, available_anatomical_data, name="m niu.IdentityInterface(fields=recon_workflow_input_fields + ['fod_sh_mif']), name="inputnode") outputnode = pe.Node( - niu.IdentityInterface(fields=['tck_file', 'sift_weights']), + niu.IdentityInterface(fields=['tck_file', 'sift_weights', 'recon_scalars']), name="outputnode") workflow = pe.Workflow(name=name) + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) # Resample anat mask tracking_params = params.get("tckgen", {}) @@ -527,8 +530,9 @@ def init_mrtrix_connectivity_wf(omp_nthreads, available_anatomical_data, name="m niu.IdentityInterface( fields=recon_workflow_input_fields + ['tck_file', 'sift_weights', 'atlas_configs']), name="inputnode") - outputnode = pe.Node(niu.IdentityInterface(fields=['matfile']), + outputnode = pe.Node(niu.IdentityInterface(fields=['matfile', 'recon_scalars']), name="outputnode") + outputnode.inputs.recon_scalars = [] plot_reports = params.pop("plot_reports", True) workflow = pe.Workflow(name=name) conmat_params = params.get("tck2connectome", {}) diff --git a/qsiprep/workflows/recon/pyafq.py b/qsiprep/workflows/recon/pyafq.py index 327f842d..06d98426 100644 --- a/qsiprep/workflows/recon/pyafq.py +++ b/qsiprep/workflows/recon/pyafq.py @@ -62,8 +62,9 @@ def init_pyafq_wf(omp_nthreads, available_anatomical_data, fields=recon_workflow_input_fields + ['tck_file']), name="inputnode") outputnode = pe.Node( - niu.IdentityInterface(fields=['afq_dir']), + niu.IdentityInterface(fields=['afq_dir', 'recon_scalars']), name="outputnode") + outputnode.inputs.recon_scalars=[] kwargs = _parse_qsiprep_params_dict(params) kwargs["omp_nthreads"] = omp_nthreads diff --git a/qsiprep/workflows/recon/scalar_mapping.py b/qsiprep/workflows/recon/scalar_mapping.py new file mode 100644 index 00000000..e7df66c5 --- /dev/null +++ b/qsiprep/workflows/recon/scalar_mapping.py @@ -0,0 +1,131 @@ +""" +Converting between file formats +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autofunction:: init_mif_to_fibgz_wf +.. autofunction:: init_fibgz_to_mif_wf + +""" +import nipype.pipeline.engine as pe +import nipype.interfaces.utility as niu +import logging +from ...interfaces.bids import ReconDerivativesDataSink +from ...interfaces.interchange import recon_workflow_input_fields +from ...engine import Workflow +from ...interfaces.scalar_mapping import BundleMapper +LOGGER = logging.getLogger('nipype.workflow') + + +def init_scalar_to_bundle_wf(omp_nthreads, available_anatomical_data, + name="scalar_to_bundle", output_suffix="", params={}): + """Map scalar images to bundles + + + + Inputs + tck_files + MRtrix3 format tck files for each bundle + bundle_names + Names that describe which bundles are present in `tck_files` + recon_scalars + List of dictionaries containing scalar info + + Outputs + + bundle_summaries + summary statistics in tsv formar + + """ + inputnode = pe.Node( + niu.IdentityInterface( + fields=recon_workflow_input_fields + + ["tck_files", "bundle_names", "recon_scalars", "collected_scalars"]), + name="inputnode") + outputnode = pe.Node( + niu.IdentityInterface(fields=['bundle_summary']), name="outputnode") + workflow = Workflow(name=name) + bundle_mapper = pe.Node( + BundleMapper(**params), + name="bundle_mapper") + workflow.connect([ + (inputnode, bundle_mapper, [ + ("collected_scalars", "recon_scalars"), + ("tck_files", "tck_files"), + ("dwi_ref", "dwiref_image"), + ("bundle_names", "bundle_names")]), + (bundle_mapper, outputnode, [ + ("bundle_summary", "bundle_summary")]) + ]) + if output_suffix: + + ds_bundle_summaries = pe.Node( + ReconDerivativesDataSink(desc="bundlemap", + suffix=output_suffix), + name='ds_bundle_summaries', + run_without_submitting=True) + workflow.connect([ + (bundle_mapper, ds_bundle_summaries, [("bundle_summary", "in_file")]) + ]) + + return workflow + + +def init_scalar_to_atlas_wf(omp_nthreads, available_anatomical_data, + name="scalar_to_template", output_suffix="", params={}): + """Map scalar images to atlas regions + + Inputs + tck_files + MRtrix3 format tck files for each bundle + bundle_names + Names that describe which bundles are present in `tck_files` + recon_scalars + List of dictionaries containing scalar info + + Outputs + bundle_summaries + summary statistics in tsv format + + """ + inputnode = pe.Node( + niu.IdentityInterface( + fields=recon_workflow_input_fields + + ["tck_files", "bundle_names", "recon_scalars"]), + name="inputnode") + outputnode = pe.Node( + niu.IdentityInterface(fields=['bundle_summaries']), name="outputnode") + workflow = Workflow(name=name) + bundle_mapper = pe.Node( + BundleMapper(**params), + name="bundle_mapper") + workflow.connect([ + (inputnode, bundle_mapper, [ + ("recon_scalars", "recon_scalars"), + ("tck_files", "tck_files"), + ("dwi_ref", "dwiref_image")]) + ]) + if output_suffix: + + ds_bundle_summaries = pe.Node( + ReconDerivativesDataSink(desc="bundlemap", + suffix=output_suffix), + name='ds_bundle_summaries', + run_without_submitting=True) + workflow.connect([ + (bundle_mapper, ds_bundle_summaries, [("bundle_summaries", "in_file")]) + ]) + return workflow + + +def init_scalar_to_template_wf(omp_nthreads, available_anatomical_data, + name="scalar_to_template", output_suffix="", params={}): + """Maps scalar data to a volumetric template""" + + return workflow + + +def init_scalar_to_surface_wf(omp_nthreads, available_anatomical_data, + name="scalar_to_surface", output_suffix="", params={}): + """Maps scalar data to a surface.""" + raise NotImplementedError() + return workflow diff --git a/qsiprep/workflows/recon/tortoise.py b/qsiprep/workflows/recon/tortoise.py index 1308114a..16b38930 100644 --- a/qsiprep/workflows/recon/tortoise.py +++ b/qsiprep/workflows/recon/tortoise.py @@ -100,7 +100,8 @@ def init_tortoise_estimator_wf( ('dwi_file', 'dwi_file'), ('bval_file', 'bval_file'), ('bvec_file', 'bvec_file'), - ('dwi_mask', 'mask_file')])]) + ('dwi_mask', 'mask_file')]), + (recon_scalars, outputnode, [("scalar_info", "recon_scalars")])]) # EstimateTensor if tensor_opts: