diff --git a/conftest.py b/conftest.py index e72525fff..5abc80fd2 100644 --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,7 @@ # limitations under the License. +import numpy as np from pyspark.sql import SparkSession import pytest import os @@ -50,6 +51,16 @@ def spark(spark_builder): sess = spark_builder.getOrCreate() return sess.newSession() +def pytest_addoption(parser): + parser.addoption('--random-seed', action='store', type=int, help='Seed to use for random number generator') + +@pytest.fixture(scope="function") +def rg(pytestconfig): + seed = pytestconfig.getoption('random_seed') + seed_seq = np.random.SeedSequence(seed) + print(f'Creating random number generator with seed {seed_seq.entropy}') + return np.random.default_rng(seed_seq) + def pytest_runtest_setup(item): min_spark_version = next((mark.args[0] for mark in item.iter_markers(name='min_spark')), None) if min_spark_version: diff --git a/docs/source/environment.yml b/docs/source/environment.yml index c56ffd3ff..0c4fe1bcb 100644 --- a/docs/source/environment.yml +++ b/docs/source/environment.yml @@ -6,6 +6,7 @@ dependencies: - python=3.7 - nptyping=1.3.0 - numpy>=1.17.4 + - opt_einsum>=3.2.0 - pandas>=0.25.3 - pip - scikit-learn=0.22.1 diff --git a/python/environment.yml b/python/environment.yml index 144f31da1..939ea0f58 100644 --- a/python/environment.yml +++ b/python/environment.yml @@ -8,10 +8,11 @@ dependencies: - click=7.1.1 # Docs notebook source generation - databricks-cli=0.9.1 # Docs notebook source generation - jinja2 - - jupyter + - jupyterlab - nomkl # Skip MKL for local development - nptyping=1.3.0 - numpy=1.17.5 + - opt_einsum>=3.2.0 - pandas=0.25.3 - pip - pyarrow=0.15.1 diff --git a/python/glow/gwas/functions.py b/python/glow/gwas/functions.py new file mode 100644 index 000000000..073668552 --- /dev/null +++ b/python/glow/gwas/functions.py @@ -0,0 +1,117 @@ +from pyspark.sql import SparkSession, functions as fx +from pyspark.sql.types import StructField, StructType, FloatType, DoubleType, ArrayType +from typing import Any, Callable, Dict, List, TypeVar, Union +import numpy as np +import pandas as pd +from nptyping import Float, NDArray +from typeguard import typechecked +import opt_einsum as oe +from ..wgr.linear_model.functions import __assert_all_present, __check_binary + +_VALUES_COLUMN_NAME = '_glow_regression_values' +_GENOTYPES_COLUMN_NAME = 'genotypes' + + +def _check_spark_version(spark: SparkSession) -> bool: + if int(spark.version.split('.')[0]) < 3: + raise AttributeError( + 'Pandas based regression tests are only supported on Spark 3.0 or greater') + + +def _output_schema(input_fields: List[StructField], result_fields: List[StructField]) -> StructType: + + fields = [field for field in input_fields if field.name != _VALUES_COLUMN_NAME] + result_fields + return StructType(fields) + + +def _validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary): + for col in covariate_df: + __assert_all_present(covariate_df, col, 'covariate') + if not covariate_df.empty: + if phenotype_df.shape[0] != covariate_df.shape[0]: + raise ValueError( + f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}' + ) + if is_binary: + __check_binary(phenotype_df) + + +def _regression_sql_type(dt): + if dt == np.float32: + return FloatType() + elif dt == np.float64: + return DoubleType() + else: + raise ValueError('dt must be np.float32 or np.float64') + + +def _prepare_genotype_df(genotype_df, values_column, sql_type): + if isinstance(values_column, str): + if values_column == _GENOTYPES_COLUMN_NAME: + raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"') + out = (genotype_df.withColumn(_VALUES_COLUMN_NAME, + fx.col(values_column).cast( + ArrayType(sql_type))).drop(values_column)) + else: + out = genotype_df.withColumn(_VALUES_COLUMN_NAME, values_column.cast(ArrayType(sql_type))) + + if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]: + out = out.drop(_GENOTYPES_COLUMN_NAME) + return out + + +@typechecked +def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]: + intercept = np.ones((num_samples, 1)) + return np.hstack((intercept, C)) if C.size else intercept + + +def _einsum(subscripts: str, *operands: NDArray) -> NDArray: + ''' + A wrapper around np.einsum to ensure uniform options. + ''' + return oe.contract(subscripts, *operands, casting='no', optimize='dp', memory_limit='max_input') + + +def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool: + return idx1.sort_values().equals(idx2.sort_values()) + + +T = TypeVar('T') + + +def _loco_dispatch(genotype_pdf: pd.DataFrame, state: Union[T, Dict[str, T]], f: Callable, *args): + ''' + Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel + depending whether we have one Y matrix or one Y matrix per contig. + ''' + if isinstance(state, dict): + return genotype_pdf.groupby('contigName', sort=False, as_index=False)\ + .apply(lambda pdf: f(pdf, state[pdf['contigName'].iloc[0]], *args)) + else: + return f(genotype_pdf, state, *args) + + +@typechecked +def _loco_make_state(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, + offset_df: pd.DataFrame, f: Callable[..., T], *args) -> Union[T, Dict[str, T]]: + if not offset_df.empty: + if not _have_same_elements(phenotype_df.columns, offset_df.columns): + raise ValueError(f'phenotype_df and offset_df should have the same column names.') + if offset_df.index.nlevels == 1: # Indexed by sample id + if not _have_same_elements(phenotype_df.index, offset_df.index): + raise ValueError(f'phenotype_df and offset_df should have the same index.') + return f(Y, phenotype_df, offset_df, *args) + elif offset_df.index.nlevels == 2: # Indexed by sample id and contig + all_contigs = offset_df.index.get_level_values(1).unique() + state_dict = {} + for contig in all_contigs: + offset_for_contig = offset_df.xs(contig, level=1) + if not _have_same_elements(phenotype_df.index, offset_for_contig.index): + raise ValueError( + 'When using a multi-indexed offset_df, the offsets for each contig ' + 'should have the same index as phenotype_df') + state_dict[contig] = f(Y, phenotype_df, offset_for_contig, *args) + return state_dict + else: + return f(Y, phenotype_df, None, *args) diff --git a/python/glow/gwas/lin_reg.py b/python/glow/gwas/lin_reg.py index b9057f2e5..1b6808f36 100644 --- a/python/glow/gwas/lin_reg.py +++ b/python/glow/gwas/lin_reg.py @@ -8,12 +8,11 @@ from scipy import stats from typeguard import typechecked from ..wgr.linear_model.functions import __assert_all_present +from . import functions as gwas_fx +from .functions import _VALUES_COLUMN_NAME, _GENOTYPES_COLUMN_NAME __all__ = ['linear_regression'] -_VALUES_COLUMN_NAME = '_linreg_values' -_GENOTYPES_COLUMN_NAME = 'genotypes' - @typechecked def linear_regression(genotype_df: DataFrame, @@ -84,38 +83,14 @@ def linear_regression(genotype_df: DataFrame, - ``pvalue``: P value estimated from a two sided T-test - ``phenotype``: The phenotype name as determined by the column names of ``phenotype_df`` ''' - if int(genotype_df.sql_ctx.sparkSession.version.split('.')[0]) < 3: - raise AttributeError( - 'Pandas based linear regression is only supported on Spark 3.0 or greater') - - # Validate input - for col in covariate_df: - __assert_all_present(covariate_df, col, 'covariate') - if not covariate_df.empty: - if phenotype_df.shape[0] != covariate_df.shape[0]: - raise ValueError( - f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}' - ) - - if dt == np.float32: - sql_type = FloatType() - elif dt == np.float64: - sql_type = DoubleType() - else: - raise ValueError('dt must be np.float32 or np.float64') - - if isinstance(values_column, str): - if values_column == _GENOTYPES_COLUMN_NAME: - raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"') - genotype_df = (genotype_df.withColumn(_VALUES_COLUMN_NAME, - fx.col(values_column).cast( - ArrayType(sql_type))).drop(values_column)) - else: - genotype_df = genotype_df.withColumn(_VALUES_COLUMN_NAME, - values_column.cast(ArrayType(sql_type))) - - if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]: - genotype_df = genotype_df.drop(_GENOTYPES_COLUMN_NAME) + + gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) + + gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=False) + + sql_type = gwas_fx._regression_sql_type(dt) + + genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type) # Construct output schema result_fields = [ @@ -125,13 +100,11 @@ def linear_regression(genotype_df: DataFrame, StructField('pvalue', sql_type), StructField('phenotype', StringType()) ] - fields = [field for field in genotype_df.schema.fields if field.name != _VALUES_COLUMN_NAME - ] + result_fields - result_struct = StructType(fields) + result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields) C = covariate_df.to_numpy(dt, copy=True) if fit_intercept: - C = _add_intercept(C, phenotype_df.shape[0]) + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) # Prepare covariate basis and phenotype residuals Q = np.linalg.qr(C)[0] @@ -140,54 +113,19 @@ def linear_regression(genotype_df: DataFrame, np.nan_to_num(Y, copy=False) _residualize_in_place(Y, Q) - if not offset_df.empty: - if not _have_same_elements(phenotype_df.columns, offset_df.columns): - raise ValueError(f'phenotype_df and offset_df should have the same column names.') - if offset_df.index.nlevels == 1: # Indexed by sample id - if not _have_same_elements(phenotype_df.index, offset_df.index): - raise ValueError(f'phenotype_df and offset_df should have the same index.') - Y_state = _create_YState_from_offset(Y, Y_mask, phenotype_df, offset_df, dt) - elif offset_df.index.nlevels == 2: # Indexed by sample id and contig - all_contigs = offset_df.index.get_level_values(1).unique() - Y_state = {} - for contig in all_contigs: - offset_for_contig = offset_df.xs(contig, level=1) - if not _have_same_elements(phenotype_df.index, offset_for_contig.index): - raise ValueError( - 'When using a multi-indexed offset_df, the offsets for each contig ' - 'should have the same index as phenotype_df') - Y_state[contig] = _create_YState_from_offset(Y, Y_mask, phenotype_df, - offset_for_contig, dt) - else: - Y_state = _create_YState(Y, Y_mask) + Y_state = gwas_fx._loco_make_state(Y, phenotype_df, offset_df, + lambda y, pdf, odf: _create_YState(y, pdf, odf, Y_mask, dt)) dof = C.shape[0] - C.shape[1] - 1 def map_func(pdf_iterator): for pdf in pdf_iterator: - yield _linear_regression_dispatch(pdf, Y_state, Y_mask, Q, dof, - phenotype_df.columns.to_series().astype('str')) + yield gwas_fx._loco_dispatch(pdf, Y_state, _linear_regression_inner, Y_mask, Q, dof, + phenotype_df.columns.to_series().astype('str')) return genotype_df.mapInPandas(map_func, result_struct) -def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool: - return idx1.sort_values().equals(idx2.sort_values()) - - -def _create_YState_from_offset(Y: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - phenotype_df: pd.DataFrame, offset_df: pd.DataFrame, - dt) -> NDArray[(Any, Any), Float]: - Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt) - return _create_YState(Y, Y_mask) - - -def _create_YState(Y: NDArray[(Any, Any), Float], - Y_mask: NDArray[(Any, Any), Float]) -> NDArray[(Any, Any), Float]: - Y *= Y_mask - return YState(Y, np.sum(Y * Y, axis=0)) - - @dataclass class YState: ''' @@ -197,18 +135,12 @@ class YState: YdotY: NDArray[(Any), Float] -@typechecked -def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]: - intercept = np.ones((num_samples, 1)) - return np.hstack((intercept, C)) if C.size else intercept - - -@typechecked -def _einsum(subscripts: str, *operands: NDArray) -> NDArray: - ''' - A wrapper around np.einsum to ensure uniform options. - ''' - return np.einsum(subscripts, *operands, casting='no') +def _create_YState(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, + offset_df: pd.DataFrame, Y_mask: NDArray[(Any, Any), Float], dt) -> YState: + if offset_df is not None: + Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt) + Y *= Y_mask + return YState(Y, np.sum(Y * Y, axis=0)) @typechecked @@ -222,19 +154,6 @@ def _residualize_in_place(M: NDArray[(Any, Any), Float], return M -def _linear_regression_dispatch(genotype_pdf: pd.DataFrame, - Y_state: Union[YState, Dict[str, YState]], *args) -> pd.DataFrame: - ''' - Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel - depending whether we have one Y matrix or one Y matrix per contig. - ''' - if isinstance(Y_state, dict): - return genotype_pdf.groupby('contigName', sort=False, as_index=False)\ - .apply(lambda pdf: _linear_regression_inner(pdf, Y_state[pdf['contigName'].iloc[0]], *args)) - else: - return _linear_regression_inner(genotype_pdf, Y_state, *args) - - @typechecked def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState, Y_mask: NDArray[(Any, Any), Float], Q: NDArray[(Any, Any), Float], @@ -255,7 +174,7 @@ def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState, ''' X = _residualize_in_place(np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array), Q) XdotY = Y_state.Y.T @ X - XdotX_reciprocal = 1 / _einsum('sp,sg,sg->pg', Y_mask, X, X) + XdotX_reciprocal = 1 / gwas_fx._einsum('sp,sg,sg->pg', Y_mask, X, X) betas = XdotY * XdotX_reciprocal standard_error = np.sqrt((Y_state.YdotY[:, None] * XdotX_reciprocal - betas * betas) / dof) T = betas / standard_error diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py new file mode 100644 index 000000000..b765aafce --- /dev/null +++ b/python/glow/gwas/log_reg.py @@ -0,0 +1,201 @@ +from pyspark.sql.types import StringType, StructField +from typing import Any, Optional +import pandas as pd +import numpy as np +from pyspark.sql import DataFrame +import statsmodels.api as sm +from dataclasses import dataclass +from typeguard import typechecked +from nptyping import Float, NDArray +from scipy import stats +import opt_einsum as oe +from . import functions as gwas_fx +from .functions import _VALUES_COLUMN_NAME + +__all__ = ['logistic_regression'] + +fallback_none = 'none' + + +@typechecked +def logistic_regression( + genotype_df: DataFrame, + phenotype_df: pd.DataFrame, + covariate_df: pd.DataFrame = pd.DataFrame({}), + offset_df: pd.DataFrame = pd.DataFrame({}), + correction: str = 'none', # TODO: Make approx-firth default + pvalue_threshold: float = 0.05, + fit_intercept: bool = True, + values_column: str = 'values', + dt: type = np.float64) -> DataFrame: + ''' + Uses logistic regression to test for association between genotypes and one or more binary + phenotypes. This is a distributed version of the method from regenie: + https://www.biorxiv.org/content/10.1101/2020.06.19.162354v2 + + On the driver node, we fit a logistic regression model based on the covariates for each + phenotype: + + logit(y) ~ C + + where y is a phenotype vector and C is the covariate matrix. + + We compute the probability predictions h_hat and broadcast the residuals (y - y_hat), gamma vectors + (where gamma is defined as y_hat * (1 - Y_hat)), and (C.T gamma C)^-1 matrices. In each task, + we then adjust the new genotypes based on the null fit, perform a score test as a fast scan + for potentially significant variants, and then test variants with p values below a threshold + using a more selective, more expensive test. + + Args: + genotype_df : Spark DataFrame containing genomic data + phenotype_df : Pandas DataFrame containing phenotypic data + covariate_df : An optional Pandas DataFrame containing covariates + offset_df : An optional Pandas DataFrame containing the phenotype offset. This value will be used + as an offset in the covariate only and per variant logistic regression models. The ``offset_df`` may + have one or two levels of indexing. If one level, the index should be the same as the ``phenotype_df``. + If two levels, the level 0 index should be the same as the ``phenotype_df``, and the level 1 index + should be the contig name. The two level index scheme allows for per-contig offsets like + LOCO predictions from GloWGR. + correction : Which test to use for variants that meet a significance threshold for the score test + pvalue_threshold : Variants with a pvalue below this threshold will be tested using the ``correction`` method. + fit_intercept : Whether or not to add an intercept column to the covariate DataFrame + values_column : A column name or column expression to test with linear regression. If a column name is provided, + ``genotype_df`` should have a column with this name and a numeric array type. If a column expression + is provided, the expression should return a numeric array type. + dt : The numpy datatype to use in the linear regression test. Must be ``np.float32`` or ``np.float64``. + + Returns: + A Spark DataFrame that contains + + - All columns from ``genotype_df`` except the ``values_column`` and the ``genotypes`` column if one exists + - ``tvalue``: The chi squared test statistic according to the score test or the correction method + - ``pvalue``: P value estimated from the test statistic + - ``phenotype``: The phenotype name as determiend by the column names of ``phenotype_df`` + ''' + + gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) + gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=True) + sql_type = gwas_fx._regression_sql_type(dt) + genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type) + result_fields = [ + # TODO: Probably want to put effect size and stderr here for approx-firth + StructField('tvalue', sql_type), + StructField('pvalue', sql_type), + StructField('phenotype', StringType()) + ] + + result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields) + C = covariate_df.to_numpy(dt, copy=True) + if fit_intercept: + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) + Y = phenotype_df.to_numpy(dt, copy=True) + Y_mask = ~(np.isnan(Y)) + np.nan_to_num(Y, copy=False) + + state = gwas_fx._loco_make_state( + Y, phenotype_df, offset_df, + lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask)) + + phenotype_names = phenotype_df.columns.to_series().astype('str') + + def map_func(pdf_iterator): + for pdf in pdf_iterator: + yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, C, Y_mask, + correction, phenotype_names) + + return genotype_df.mapInPandas(map_func, result_struct) + + +@typechecked +def _logistic_null_model_predictions( + y: NDArray[Any, Float], + X: NDArray[Any, Float], + y_mask: NDArray[Any, bool], + # nptying can't handle optional NDArrays, so don't verify the type for offset + offset +) -> NDArray[Any, Float]: + if offset is not None: + offset = offset[y_mask] + model = sm.GLM(y[y_mask], + X[y_mask], + family=sm.families.Binomial(), + offset=offset, + missing='ignore') + fit_result = model.fit() + predictions = model.predict(fit_result.params) + + # Store 0 as prediction for samples with missing phenotypes + remapped_predictions = np.zeros(y.shape) + remapped_predictions[y_mask] = predictions + return remapped_predictions + + +@dataclass +class LogRegState: + inv_CtGammaC: NDArray[(Any, Any), Float] + gamma: NDArray[(Any, Any), Float] + Y_res: NDArray[(Any, Any), Float] + + +@typechecked +def _create_log_reg_state( + Y: NDArray[(Any, Any), Float], + phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py + offset_df: Optional[pd.DataFrame], + C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float]) -> LogRegState: + Y_pred = np.row_stack([ + _logistic_null_model_predictions( + Y[:, i], C, Y_mask[:, i], + offset_df.iloc[:, i].to_numpy() if offset_df is not None else None) + for i in range(Y.shape[1]) + ]) + gamma = Y_pred * (1 - Y_pred) + CtGammaC = C.T @ (gamma[:, :, None] * C) + inv_CtGammaC = np.linalg.inv(CtGammaC) + return LogRegState(inv_CtGammaC, gamma, (Y - Y_pred.T) * Y_mask) + + +def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float], gamma: NDArray[(Any, Any), Float], + inv_CtGammaC: NDArray[(Any, Any), Float]) -> NDArray[(Any, Any), Float]: + ''' + Residualize the genotype vectors given the null model predictions. + X_res = X - C(C.T gamma C)^-1 C.T gamma X + ''' + X_hat = gwas_fx._einsum('ic,pcd,ds,ps,sg,sp->igp', C, inv_CtGammaC, C.T, gamma, X, Y_mask) + return X[:, :, None] - X_hat + + +def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, + C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], + fallback_method: str, phenotype_names: pd.Series) -> pd.DataFrame: + ''' + Tests a block of genotypes for association with binary traits. We first residualize + the genotypes based on the null model fit, then perform a fast score test to check for + possible significance. + + We use semantic indices for the einsum expressions: + s, i: sample (or individual) + g: genotype + p: phenotype + c, d: covariate + ''' + X = np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array) + with oe.shared_intermediates(): + X_res = _logistic_residualize(X, C, Y_mask, log_reg_state.gamma, log_reg_state.inv_CtGammaC) + num = gwas_fx._einsum('sgp,sp->pg', X_res, log_reg_state.Y_res)**2 + denom = gwas_fx._einsum('sgp,sgp,ps->pg', X_res, X_res, log_reg_state.gamma) + t_values = np.ravel(num / denom) + p_values = stats.chi2.sf(t_values, 1) + + if fallback_method != fallback_none: + # TODO: Call approx firth here + () + + del genotype_pdf[_VALUES_COLUMN_NAME] + out_df = pd.concat([genotype_pdf] * log_reg_state.Y_res.shape[1]) + out_df['tvalue'] = list(np.ravel(t_values)) + out_df['pvalue'] = list(np.ravel(p_values)) + out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() + return out_df diff --git a/python/glow/gwas/tests/test_linear_regression.py b/python/glow/gwas/tests/test_lin_reg.py similarity index 72% rename from python/glow/gwas/tests/test_linear_regression.py rename to python/glow/gwas/tests/test_lin_reg.py index de9586a97..852b2d91b 100644 --- a/python/glow/gwas/tests/test_linear_regression.py +++ b/python/glow/gwas/tests/test_lin_reg.py @@ -1,6 +1,7 @@ import numpy as np import statsmodels.api as sm import pandas as pd +import glow.gwas.functions as gwas_fx import glow.gwas.lin_reg as lr import pytest import glow @@ -10,7 +11,7 @@ def run_linear_regression(genotype_df, phenotype_df, covariate_df, fit_intercept phenotype_names = phenotype_df.columns.astype('str').to_series() C = covariate_df.to_numpy('float64', copy=True) if fit_intercept: - C = lr._add_intercept(C, genotype_df.shape[0]) + C = gwas_fx._add_intercept(C, genotype_df.shape[0]) if not C.size: C = np.zeros((genotype_df.shape[0], 1)) Y = phenotype_df.to_numpy('float64', copy=True) @@ -18,7 +19,7 @@ def run_linear_regression(genotype_df, phenotype_df, covariate_df, fit_intercept Y[~Y_mask] = 0 Q = np.linalg.qr(C)[0] Y = lr._residualize_in_place(Y, Q) - Y_state = lr._create_YState(Y, Y_mask) + Y_state = lr._create_YState(Y, phenotype_df, None, Y_mask, np.float64) dof = C.shape[0] - C.shape[1] - 1 pdf = pd.DataFrame({lr._VALUES_COLUMN_NAME: list(genotype_df.to_numpy('float64').T)}) @@ -64,7 +65,7 @@ def statsmodels_baseline(genotype_df, C = covariate_df.to_numpy('float64') num_samples = C.shape[0] if C.size else genotype_df.shape[0] if fit_intercept: - C = lr._add_intercept(C, num_samples) + C = gwas_fx._add_intercept(C, num_samples) Y = phenotype_df.to_numpy('float64') X = genotype_df.to_numpy('float64') phenotype_df.columns = phenotype_df.columns.astype('str') @@ -168,55 +169,55 @@ def test_r_glm_covariates(): assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) -def test_multiple(): +def test_multiple(rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) -def test_missing(): +def test_missing(rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 1))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 1))) phenotype_df.loc[0, 0] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression(genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_missing_spark(spark): +def test_missing_spark(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 3))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 3))) phenotype_df.loc[0, 0] = np.nan phenotype_df.loc[[1, 3, 5], 1] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_multiple_spark(spark): +def test_multiple_spark(spark, rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert regression_results_equal(baseline, results) @pytest.mark.min_spark('3') -def test_propagate_extra_cols(spark): +def test_propagate_extra_cols(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, extra_cols) @@ -228,11 +229,11 @@ def test_propagate_extra_cols(spark): @pytest.mark.min_spark('3') -def test_different_values_column(spark): +def test_different_values_column(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -242,52 +243,52 @@ def test_different_values_column(spark): @pytest.mark.min_spark('3') -def test_intercept_no_covariates(spark): +def test_intercept_no_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) # No error run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_validates_missing_covariates(spark): +def test_validates_missing_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) covariate_df.loc[0, 0] = np.nan with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) @pytest.mark.min_spark('3') -def test_validate_same_number_of_rows(spark): - genotype_df = pd.DataFrame(np.random.random((4, 3))) - phenotype_df = pd.DataFrame(np.random.random((4, 5))) - covariate_df = pd.DataFrame(np.random.random((5, 2))) +def test_validate_same_number_of_rows(spark, rg): + genotype_df = pd.DataFrame(rg.random((4, 3))) + phenotype_df = pd.DataFrame(rg.random((4, 5))) + covariate_df = pd.DataFrame(rg.random((5, 2))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) -def test_error_for_old_spark(spark): +def test_error_for_old_spark(spark, rg): if spark.version.startswith('2'): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) with pytest.raises(AttributeError): run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_simple_offset(spark): +def test_simple_offset(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -298,15 +299,15 @@ def test_simple_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset(spark): +def test_multi_offset(spark, rg): num_samples = 25 num_pheno = 25 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) - offset_df = pd.DataFrame(np.random.random((num_samples * 2, num_pheno)), index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) results = run_linear_regression_spark(spark, genotype_df, @@ -321,20 +322,19 @@ def test_multi_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset_with_missing(spark): +def test_multi_offset_with_missing(spark, rg): num_samples = 25 num_pheno = 24 num_geno = 18 contigs = ['chr1', 'chr2', 'chr3'] - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) missing = np.triu(np.ones(phenotype_df.shape)) missing[:, -1] = 0 phenotype_df[missing.astype('bool')] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), - index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': contigs * 6}) results = run_linear_regression_spark(spark, genotype_df, @@ -344,53 +344,54 @@ def test_multi_offset_with_missing(spark): extra_cols=extra_cols) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df.xs(contig, level=1) for contig in contigs] * 6) + assert regression_results_equal(results, baseline) @pytest.mark.min_spark('3') -def test_offset_wrong_columns(spark): +def test_offset_wrong_columns(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), columns=range(10, 20)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), columns=range(10, 20)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_index(spark): +def test_offset_wrong_index(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), index=range(1, 26)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), index=range(1, 26)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_multi_index(spark): +def test_offset_wrong_multi_index(spark, rg): num_samples = 25 num_pheno = 10 contigs = ['chr1', 'chr2'] - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), pd.MultiIndex.from_product([range(1, 26), contigs])) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_different_index_order(spark): +def test_offset_different_index_order(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) phenotype_df.columns = phenotype_df.columns.astype('str') - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) offset_df.columns = offset_df.columns.astype('str') baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df] * num_geno) @@ -407,11 +408,11 @@ def test_offset_different_index_order(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes(spark): +def test_cast_genotypes(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert results['effect'].dtype == np.float64 @@ -419,11 +420,11 @@ def test_cast_genotypes(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes_float32(spark): +def test_cast_genotypes_float32(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, @@ -435,21 +436,21 @@ def test_cast_genotypes_float32(spark): @pytest.mark.min_spark('3') -def test_bad_datatype(spark): +def test_bad_datatype(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, dt=np.int32) @pytest.mark.min_spark('3') -def test_bad_column_name(spark): +def test_bad_column_name(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, @@ -459,12 +460,12 @@ def test_bad_column_name(spark): @pytest.mark.min_spark('3') -def test_values_expr(spark): +def test_values_expr(spark, rg): from pyspark.sql.functions import array, lit num_samples = 5 genotype_df = spark.range(1).withColumn('genotypes', lit(42)) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) array_vals = [lit(i) for i in range(num_samples)] results = lr.linear_regression(genotype_df, phenotype_df, diff --git a/python/glow/gwas/tests/test_log_reg.py b/python/glow/gwas/tests/test_log_reg.py new file mode 100644 index 000000000..080834755 --- /dev/null +++ b/python/glow/gwas/tests/test_log_reg.py @@ -0,0 +1,314 @@ +import glow.gwas.log_reg as lr +import glow.gwas.functions as gwas_fx +import statsmodels.api as sm +import pandas as pd +import numpy as np +import pytest + + +def run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=True): + C = covariate_df.to_numpy(copy=True) + if fit_intercept: + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) + Y = phenotype_df.to_numpy(copy=True) + Y_mask = ~np.isnan(Y) + Y[~Y_mask] = 0 + state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask) + values_df = pd.DataFrame({gwas_fx._VALUES_COLUMN_NAME: list(genotype_df.to_numpy().T)}) + return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.fallback_none, + phenotype_df.columns.to_series().astype('str')) + + +def statsmodels_baseline(genotype_df, + phenotype_df, + covariate_df, + offset_dfs=None, + fit_intercept=True): + if fit_intercept: + covariate_df = sm.add_constant(covariate_df) + p_values = [] + t_values = [] + for phenotype in phenotype_df: + for genotype_idx in range(genotype_df.shape[1]): + mask = ~np.isnan(phenotype_df[phenotype].to_numpy()) + if offset_dfs is not None: + offset = offset_dfs[genotype_idx][phenotype] + else: + offset = None + model = sm.GLM(phenotype_df[phenotype], + covariate_df, + offset=offset, + family=sm.families.Binomial(), + missing='drop') + params = model.fit().params + results = model.score_test(params, + exog_extra=genotype_df.iloc[:, genotype_idx].array[mask]) + t_values.append(results[0]) + p_values.append(results[1]) + return pd.DataFrame({ + 'tvalue': np.concatenate(t_values), + 'pvalue': np.concatenate(p_values), + 'phenotype': phenotype_df.columns.to_series().astype('str').repeat(genotype_df.shape[1]) + }) + + +def run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df=pd.DataFrame({}), + extra_cols=pd.DataFrame({}), + values_column='values', + **kwargs): + pdf = pd.DataFrame({values_column: genotype_df.to_numpy().T.tolist()}) + if not extra_cols.empty: + pdf = pd.concat([pdf, extra_cols], axis=1) + pdf['idx'] = pdf.index + results = (lr.logistic_regression(spark.createDataFrame(pdf), + phenotype_df, + covariate_df, + values_column=values_column, + **kwargs).toPandas().sort_values(['phenotype', + 'idx']).drop('idx', axis=1)) + return results + + +def regression_results_equal(df1, df2, rtol=1e-5): + df1 = df1.sort_values('phenotype', kind='mergesort') + df2 = df2.sort_values('phenotype', kind='mergesort') + strings_equal = np.array_equal(df1.phenotype.array, df2.phenotype.array) + numerics_equal = np.allclose(df1.select_dtypes(exclude=['object']), + df2.select_dtypes(exclude=['object']), + rtol=rtol) + return strings_equal and numerics_equal + + +def assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=True): + glow = run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=fit_intercept) + golden = statsmodels_baseline(genotype_df, + phenotype_df, + covariate_df, + fit_intercept=fit_intercept) + assert regression_results_equal(glow, golden) + + +def random_phenotypes(shape, rg): + return rg.integers(low=0, high=2, size=shape).astype(np.float64) + + +def test_spector_non_missing(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_spector_missing(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + phenotype_df.iloc[[0, 3, 10, 25], 0] = np.nan + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_spector_no_intercept(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + phenotype_df.iloc[[0, 3, 10, 25], 0] = np.nan + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) + + +def test_multiple(rg): + n_sample = 50 + n_cov = 10 + n_pheno = 25 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_multiple_missing(rg): + n_sample = 50 + n_cov = 2 + n_pheno = 31 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[np.tril_indices_from(Y, k=-20)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +@pytest.mark.min_spark('3') +def test_multiple_spark(spark, rg): + n_sample = 40 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + run_score_test(genotype_df, phenotype_df, covariate_df) + glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + assert regression_results_equal(glow, golden) + + +def random_mask(size, missing_per_column, rg): + base = np.ones(size[0], dtype=bool) + base[:missing_per_column] = False + return np.column_stack([rg.permutation(base) for _ in range(size[1])]) + + +@pytest.mark.min_spark('3') +def test_multiple_spark_missing(spark, rg): + n_sample = 50 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[~random_mask(Y.shape, 10, rg)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + assert regression_results_equal(glow, golden) + + +@pytest.mark.min_spark('3') +def test_spark_no_intercept(spark, rg): + n_sample = 50 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[~random_mask(Y.shape, 15, rg)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + glow = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + fit_intercept=False) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, fit_intercept=False) + assert regression_results_equal(glow, golden) + + +@pytest.mark.min_spark('3') +def test_simple_offset(spark, rg): + num_samples = 25 + num_pheno = 6 + num_geno = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df] * num_geno) + assert regression_results_equal(results, baseline) + + +@pytest.mark.min_spark('3') +def test_multi_offset(spark, rg): + num_samples = 50 + num_pheno = 25 + num_geno = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) + offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) + extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df, + extra_cols=extra_cols) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, + [offset_df.xs('chr1', level=1), + offset_df.xs('chr2', level=1)] * 5) + assert regression_results_equal(results, baseline) + + +@pytest.mark.min_spark('3') +def test_cast_genotypes_float32(spark, rg): + num_samples = 50 + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + dt=np.float32) + assert results['pvalue'].dtype == np.float32 + assert regression_results_equal(baseline, results, rtol=1e-4) # Higher rtol for float32 + + +@pytest.mark.min_spark('3') +def test_multi_offset_with_missing(spark, rg): + num_samples = 25 + num_pheno = 24 + num_geno = 18 + contigs = ['chr1', 'chr2', 'chr3'] + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + phenotype_df.iloc[0, 0] = np.nan + phenotype_df.iloc[1, 0] = np.nan + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) + extra_cols = pd.DataFrame({'contigName': contigs * 6}) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df, + extra_cols=extra_cols) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, + [offset_df.xs(contig, level=1) for contig in contigs] * 6) + assert regression_results_equal(results, baseline) + + +def test_error_for_old_spark(spark, rg): + if spark.version.startswith('2'): + num_samples = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 25), rg)) + with pytest.raises(AttributeError): + run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) + + +@pytest.mark.min_spark('3') +def test_intercept_no_covariates(spark, rg): + num_samples = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 2), rg)) + # No error + run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) + + +@pytest.mark.min_spark('3') +def test_propagate_extra_cols(spark, rg): + num_samples = 50 + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) + results = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df, + extra_cols) + assert sorted(results['genotype_idx'].tolist()) == [0] * 5 + [1] * 5 + [2] * 5 + assert results.animal[results.animal == 'monkey'].all() + assert results.columns.tolist() == ['genotype_idx', 'animal', 'tvalue', 'pvalue', 'phenotype'] diff --git a/python/setup.py b/python/setup.py index 1297af50e..e79b10166 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,7 +23,9 @@ install_requires=[ 'nptyping==1.3.0', 'numpy>=1.17.4', + 'opt_einsum>=3.2.0', 'pandas>=0.25.3', + 'statsmodels>=0.10.0', 'typeguard==2.9.1', ], author='The Glow Authors',