diff --git a/haptools/data/covariates.py b/haptools/data/covariates.py index 45ad1f30..64270501 100644 --- a/haptools/data/covariates.py +++ b/haptools/data/covariates.py @@ -8,16 +8,17 @@ import numpy as np from .data import Data +from .phenotypes import Phenotypes -class Covariates(Data): +class Covariates(Phenotypes): """ A class for processing covariates from a file Attributes ---------- data : np.array - The covariates in an n (samples) x 1 (covariate value) array + The covariates in an n (samples) x m (covariates) array fname : Path The path to the read-only file containing the data samples : tuple[str] @@ -33,124 +34,5 @@ class Covariates(Data): """ def __init__(self, fname: Path, log: Logger = None): - super().__init__(fname, log) - self.samples = tuple() - self.names = tuple() - - @classmethod - def load(cls: Covariates, fname: Path, samples: list[str] = None) -> Covariates: - """ - Load covariates from a TSV file - - Read the file contents and standardize the covariates - - Parameters - ---------- - fname - See documentation for :py:attr:`~.Data.fname` - samples : list[str], optional - See documentation for :py:meth:`~.Data.Covariates.read` - - Returns - ------- - covariates - A Covariates object with the data loaded into its properties - """ - covariates = cls(fname) - covariates.read(samples) - return covariates - - def read(self, samples: list[str] = None): - """ - Read covariates from a TSV file into a numpy matrix stored in :py:attr:`~.Covariates.data` - - Parameters - ---------- - samples : list[str], optional - A subset of the samples from which to extract covariates - - Defaults to loading covariates from all samples - - Raises - ------ - AssertionError - If the provided file doesn't follow the expected format - """ - super().read() - # load all info into memory - # use hook_compressed to automatically handle gz files - with hook_compressed(self.fname, mode="rt") as covars: - covar_text = reader(covars, delimiter="\t") - header = next(covar_text) - # there should at least two columns - if len(header) < 2: - raise ValueError("The covariates TSV should have at least two columns.") - # the first column should be called "sample" - if header[0] != "sample": - self.log.warning( - "The first column of the covariates TSV should contain sample IDs" - " and should be named 'sample' in the header line" - ) - # convert to list and subset samples if need be - if samples: - samples = set(samples) - covar_text = [covar for covar in covar_text if covar[0] in samples] - else: - covar_text = list(covar_text) - # the second column should be castable to a float - try: - float(covar_text[0][1]) - except: - self.log.error( - "Every column in the covariates file (besides the sample column) must" - " be numeric." - ) - # fill out the samples and data properties - data = list(zip(*covar_text)) - self.samples = data[0] - self.names = tuple(header[1:]) - # coerce strings to floats - self.data = np.transpose(np.array(data[1:], dtype="float64")) - - def __iter__(self, samples: list[str] = None) -> Iterator[namedtuple]: - """ - Read covariates from a TSV line by line without storing anything - - Parameters - ---------- - samples : list[str], optional - A subset of the samples from which to extract covariates - - Defaults to loading covariates from all samples - - Yields - ------ - Iterator[namedtuple] - An iterator over each line in the file, where each line is encoded as a - namedtuple containing each of the class properties - """ - with hook_compressed(self.fname, mode="rt") as covars: - covar_text = reader(covars, delimiter="\t") - header = next(covar_text) - # there should at least two columns - if len(header) < 2: - raise ValueError("The covariates TSV should have at least two columns.") - # the first column should be called "sample" - if header[0] != "sample": - self.log.warning( - "The first column of the covariates TSV should contain sample IDs" - " and should be named 'sample' in the header line" - ) - header = tuple(header[1:]) - Record = namedtuple("Record", "data samples names") - for covar in covar_text: - if samples is None or covar[0] in samples: - try: - yield Record( - np.array(covar[1:], dtype="float64"), covar[0], header - ) - except: - self.log.error( - "Every column in the covariates file (besides the sample" - " column) must be numeric." - ) + super(Phenotypes, self).__init__(fname, log) + self._ext = 'covar' diff --git a/haptools/data/haplotypes.py b/haptools/data/haplotypes.py index a6fd254a..a8ccdb88 100644 --- a/haptools/data/haplotypes.py +++ b/haptools/data/haplotypes.py @@ -87,7 +87,7 @@ def fmt_str(self) -> str: """ Convert an Extra into a fmt string - Retruns + Returns ------- str A python format string (ex: "{beta:.3f}") diff --git a/haptools/data/phenotypes.py b/haptools/data/phenotypes.py index a27c2535..c2d55792 100644 --- a/haptools/data/phenotypes.py +++ b/haptools/data/phenotypes.py @@ -1,7 +1,9 @@ from __future__ import annotations from csv import reader from pathlib import Path +from io import TextIOBase from collections import namedtuple +from collections.abc import Iterable from logging import getLogger, Logger from fileinput import hook_compressed @@ -17,11 +19,13 @@ class Phenotypes(Data): Attributes ---------- data : np.array - The phenotypes in an n (samples) x 1 (phenotype value) array + The phenotypes in an n (samples) x m (phenotypes) array fname : Path The path to the read-only file containing the data samples : tuple The names of each of the n samples + names : tuple[str] + The names of the phenotypes log: Logger A logging instance for recording debug statements. @@ -33,6 +37,8 @@ class Phenotypes(Data): def __init__(self, fname: Path, log: Logger = None): super().__init__(fname, log) self.samples = tuple() + self.names = tuple() + self._ext = 'pheno' @classmethod def load(cls: Phenotypes, fname: Path, samples: list[str] = None) -> Phenotypes: @@ -75,36 +81,26 @@ def read(self, samples: list[str] = None): If the provided file doesn't follow the expected format """ super().read() - # load all info into memory - # use hook_compressed to automatically handle gz files - with hook_compressed(self.fname, mode="rt") as phens: - phen_text = reader(phens, delimiter="\t") - # convert to list and subset samples if need be - if samples: - samples = set(samples) - phen_text = [phen for phen in phen_text if phen[0] in samples] - else: - phen_text = list(phen_text) - # there should only be two columns - if len(phen_text[0]) != 2: - self.log.warning("The phenotype TSV should only have two columns.") - # the second column should be castable to a float - try: - float(phen_text[0][1]) - except: - self.log.error("The second column of the TSV file must numeric.") + # call self.__iter__() to obtain the data and samples + data, self.samples = zip(*self.__iter__(samples)) + self.log.info(f"Loaded {len(self.samples)} samples from .{self._ext} file") # fill out the samples and data properties - self.samples, self.data = zip(*phen_text) - # coerce strings to floats - self.data = np.array(self.data, dtype="float64") + # collect data in a np array + self.data = np.array(data) - def __iter__(self, samples: list[str] = None) -> Iterator[namedtuple]: + def _iterate(self, phens: TextIOBase, phen_text: Iterable, samples: set[str] = None): """ - Read phenotypes from a TSV line by line without storing anything + A generator over the lines of a TSV + + This is a helper function for :py:meth:`~.Phenotypes.__iter__` Parameters ---------- - samples : list[str], optional + phens: TextIOBase + The file handler for the stream + phen_text: Iterable + The csv.reader object containing the lines of text from the file as lists + samples : set[str], optional A subset of the samples from which to extract phenotypes Defaults to loading phenotypes from all samples @@ -115,17 +111,59 @@ def __iter__(self, samples: list[str] = None) -> Iterator[namedtuple]: An iterator over each line in the file, where each line is encoded as a namedtuple containing each of the class properties """ - with hook_compressed(self.fname, mode="rt") as phens: - phen_text = reader(phens, delimiter="\t") - Record = namedtuple("Record", "data samples") - for phen in phen_text: - if samples is None or phen[0] in samples: - try: - yield Record(float(phen[1]), phen[0]) - except: - self.log.error( - "The second column of the TSV file must numeric." - ) + self.log.info(f"Loading {len(self.names)} columns from .{self._ext} file") + Record = namedtuple("Record", "data samples") + for phen in phen_text: + if samples is None or phen[0] in samples: + try: + yield Record( + np.array(phen[1:], dtype="float64"), phen[0] + ) + except: + self.log.error( + f"Every column in the .{self._ext} file (besides the sample" + " column) must be numeric." + ) + phens.close() + + def __iter__(self, samples: list[str] = None) -> Iterable[namedtuple]: + """ + Read phenotypes from a TSV line by line without storing anything + + Parameters + ---------- + samples : list[str], optional + A subset of the samples from which to extract phenotypes + + Defaults to loading phenotypes from all samples + + Returns + ------ + Iterable[namedtuple] + See documentation for :py:meth:`~.Phenotypes._iterate` + """ + phens = hook_compressed(self.fname, mode="rt") + phen_text = reader(phens, delimiter="\t") + # ignore all of the comment lines + while True: + header = next(phen_text) + if not header[0].startswith('#') or header[0].startswith('#IID'): + break + + # there should be at least two columns + if len(header) < 2: + raise ValueError(f"The .{self._ext} file should have at least two columns.") + # the first column should be called "#IID" + if header[0] != "#IID": + self.log.warning( + f"The first column of the .{self._ext} file should contain sample IDs" + " and should be named '#IID' in the header line" + ) + self.names = tuple(header[1:]) + samples = set(samples) if samples else None + # call another function to force the lines above to be run immediately + # see https://stackoverflow.com/a/36726497 + return self._iterate(phens, phen_text, samples) def standardize(self): """ @@ -133,4 +171,4 @@ def standardize(self): This function modifies :py:attr:`~.Genotypes.data` in-place """ - self.data = (self.data - np.mean(self.data)) / np.std(self.data) + self.data = (self.data - np.mean(self.data, axis=0)) / np.std(self.data, axis=0) diff --git a/tests/test_data.py b/tests/test_data.py index 94c31190..db97049c 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -240,94 +240,129 @@ def test_subset_genotypes(self): assert np.array_equal(gts_sub.variants, expected_variants) -def test_load_phenotypes(caplog): - # create a phenotype vector with shape: num_samples x 1 - expected = np.array([1, 1, 2, 2, 0]) - - # can we load the data from the phenotype file? - phens = Phenotypes(DATADIR.joinpath("simple.tsv")) - phens.read() - np.testing.assert_allclose(phens.data, expected) - assert phens.samples == ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") - - # try loading the data again - it should warn b/c we've already done it - phens.read() - assert len(caplog.records) == 1 and caplog.records[0].levelname == "WARNING" - - expected = (expected - np.mean(expected)) / np.std(expected) - phens.standardize() - np.testing.assert_allclose(phens.data, expected) - - -def test_load_phenotypes_iterate(): - # create a phenotype vector with shape: num_samples x 1 - expected = np.array([1, 1, 2, 2, 0]) - samples = ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") - - # can we load the data from the phenotype file? - phens = Phenotypes(DATADIR.joinpath("simple.tsv")) - for idx, line in enumerate(phens): - np.testing.assert_allclose(line.data, expected[idx]) - assert line.samples == samples[idx] - - -def test_load_phenotypes_subset(): - # create a phenotype vector with shape: num_samples x 1 - expected = np.array([1, 1, 2, 2, 0]) - - # subset for just the samples we want - expected = expected[[1, 3]] - - # can we load the data from the phenotype file? - phens = Phenotypes(DATADIR.joinpath("simple.tsv")) - samples = ["HG00097", "HG00100"] - phens.read(samples=samples) - np.testing.assert_allclose(phens.data, expected) - assert phens.samples == tuple(samples) - - -def test_load_covariates(caplog): - # create a covariate vector with shape: num_samples x num_covars - expected = np.array([(0, 4), (1, 20), (1, 33), (0, 15), (0, 78)]) - - # can we load the data from the covariates file? - covars = Covariates(DATADIR.joinpath("covars.tsv")) - covars.read() - np.testing.assert_allclose(covars.data, expected) - assert covars.samples == ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") - assert covars.names == ("sex", "age") - - # try loading the data again - it should warn b/c we've already done it - covars.read() - assert len(caplog.records) == 1 and caplog.records[0].levelname == "WARNING" - - -def test_load_covariates_iterate(): - # create a covariate vector with shape: num_samples x num_covars - expected = np.array([(0, 4), (1, 20), (1, 33), (0, 15), (0, 78)]) - samples = ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") - - # can we load the data from the covariates file? - covars = Covariates(DATADIR.joinpath("covars.tsv")) - for idx, line in enumerate(covars): - np.testing.assert_allclose(line.data, expected[idx]) - assert line.samples == samples[idx] - assert line.names == ("sex", "age") - - -def test_load_covariates_subset(): - # create a covariate vector with shape: num_samples x num_covars - expected = np.array([(0, 4), (1, 20), (1, 33), (0, 15), (0, 78)]) - - # subset for just the samples we want - expected = expected[[1, 3]] - - # can we load the data from the covariate file? - covars = Covariates(DATADIR.joinpath("covars.tsv")) - samples = ["HG00097", "HG00100"] - covars.read(samples=samples) - np.testing.assert_allclose(covars.data, expected) - assert covars.samples == tuple(samples) +class TestPhenotypes: + def _get_expected_phenotypes(self): + # create a phen matrix with shape: samples x phenotypes + expected = np.array([ + [1, 1, 2, 2, 0], + [3, 6, 8, 1, 4], + ], dtype="float64").T + return expected + + def _get_fake_phenotypes(self): + gts = Phenotypes(fname=None) + gts.data = self._get_expected_phenotypes() + gts.samples = ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") + gts.names = ("height", "bmi") + return gts + + def test_load_phenotypes(self, caplog): + expected_phen = self._get_fake_phenotypes() + expected = expected_phen.data + + # can we load the data from the phenotype file? + phens = Phenotypes(DATADIR.joinpath("simple.pheno")) + phens.read() + np.testing.assert_allclose(phens.data, expected) + assert phens.samples == expected_phen.samples + + # try loading the data again - it should warn b/c we've already done it + caplog.clear() + phens.read() + assert len(caplog.records) > 0 and caplog.records[0].levelname == "WARNING" + + expected = (expected - np.mean(expected, axis=0)) / np.std(expected, axis=0) + phens.standardize() + np.testing.assert_allclose(phens.data, expected) + + + def test_load_phenotypes_iterate(self): + expected_phen = self._get_fake_phenotypes() + expected = expected_phen.data + samples = expected_phen.samples + + # can we load the data from the phenotype file? + phens = Phenotypes(DATADIR.joinpath("simple.pheno")) + for idx, line in enumerate(phens): + np.testing.assert_allclose(line.data, expected[idx]) + assert line.samples == samples[idx] + + + def test_load_phenotypes_subset(self): + expected_phen = self._get_fake_phenotypes() + expected = expected_phen.data + samples = ["HG00097", "HG00100"] + + # subset for just the samples we want + expected = expected[[1, 3]] + + # can we load the data from the phenotype file? + phens = Phenotypes(DATADIR.joinpath("simple.pheno")) + phens.read(samples=samples) + np.testing.assert_allclose(phens.data, expected) + assert phens.samples == tuple(samples) + + +class TestCovariates: + def _get_expected_covariates(self): + # create a covar matrix with shape: samples x covariates + expected = np.array([(0, 4), (1, 20), (1, 33), (0, 15), (0, 78)]) + return expected + + def _get_fake_covariates(self): + gts = Phenotypes(fname=None) + gts.data = self._get_expected_covariates() + gts.samples = ("HG00096", "HG00097", "HG00099", "HG00100", "HG00101") + gts.names = ("sex", "age") + return gts + + def test_load_covariates(self, caplog): + # create a covariate vector with shape: num_samples x num_covars + expected_covar = self._get_fake_covariates() + expected = expected_covar.data + samples = expected_covar.samples + + # can we load the data from the covariates file? + covars = Covariates(DATADIR.joinpath("simple.covar")) + covars.read() + np.testing.assert_allclose(covars.data, expected) + assert covars.samples == samples + assert covars.names == ("sex", "age") + + # try loading the data again - it should warn b/c we've already done it + caplog.clear() + covars.read() + assert len(caplog.records) > 0 and caplog.records[0].levelname == "WARNING" + + + def test_load_covariates_iterate(self): + # create a covariate vector with shape: num_samples x num_covars + expected_covar = self._get_fake_covariates() + expected = expected_covar.data + samples = expected_covar.samples + + # can we load the data from the covariates file? + covars = Covariates(DATADIR.joinpath("simple.covar")) + for idx, line in enumerate(covars): + np.testing.assert_allclose(line.data, expected[idx]) + assert line.samples == samples[idx] + assert covars.names == ("sex", "age") + + + def test_load_covariates_subset(self): + # create a covariate vector with shape: num_samples x num_covars + expected_covar = self._get_fake_covariates() + expected = expected_covar.data + samples = ["HG00097", "HG00100"] + + # subset for just the samples we want + expected = expected[[1, 3]] + + # can we load the data from the covariate file? + covars = Covariates(DATADIR.joinpath("simple.covar")) + covars.read(samples=samples) + np.testing.assert_allclose(covars.data, expected) + assert covars.samples == tuple(samples) class TestHaplotypes: