diff --git a/conftest.py b/conftest.py index e72525fff..5abc80fd2 100644 --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,7 @@ # limitations under the License. +import numpy as np from pyspark.sql import SparkSession import pytest import os @@ -50,6 +51,16 @@ def spark(spark_builder): sess = spark_builder.getOrCreate() return sess.newSession() +def pytest_addoption(parser): + parser.addoption('--random-seed', action='store', type=int, help='Seed to use for random number generator') + +@pytest.fixture(scope="function") +def rg(pytestconfig): + seed = pytestconfig.getoption('random_seed') + seed_seq = np.random.SeedSequence(seed) + print(f'Creating random number generator with seed {seed_seq.entropy}') + return np.random.default_rng(seed_seq) + def pytest_runtest_setup(item): min_spark_version = next((mark.args[0] for mark in item.iter_markers(name='min_spark')), None) if min_spark_version: diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py new file mode 100644 index 000000000..8888f15d6 --- /dev/null +++ b/python/glow/gwas/approx_firth_correction.py @@ -0,0 +1,183 @@ +from typing import Any, Optional +import pandas as pd +from pandas import Series +import numpy as np +from dataclasses import dataclass +from typeguard import typechecked +from nptyping import Float, NDArray +from scipy import stats + + +@dataclass +class LogLikelihood: + pi: NDArray[(Any,), Float] + G: NDArray[(Any, Any), Float] # diag(pi(1-pi)) + I: NDArray[(Any, Any), Float] # Fisher information matrix, X'GX + deviance: Float # 2 * penalized log likelihood + + +@dataclass +class FirthFit: + beta: NDArray[(Any,), Float] + log_likelihood: LogLikelihood + + +@dataclass +class ApproxFirthState: + logit_offset: NDArray[(Any, Any), Float] + null_model_deviance: NDArray[(Any,), Float] + + +@typechecked +def _calculate_log_likelihood( + beta: NDArray[(Any,), Float], + X: NDArray[(Any, Any), Float], + y: NDArray[(Any,), Float], + offset: NDArray[(Any,), Float], + eps: float = 1e-15) -> LogLikelihood: + + pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) + G = np.diagflat(pi * (1-pi)) + I = np.atleast_2d(X.T @ G @ X) + unpenalized_log_likelihood = y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps) + _, logdet = np.linalg.slogdet(I) + penalty = 0.5 * logdet + deviance = -2 * (unpenalized_log_likelihood + penalty) + return LogLikelihood(pi, G, I, deviance) + + +@typechecked +def _fit_firth( + beta_init: NDArray[(Any,), Float], + X: NDArray[(Any, Any), Float], + y: NDArray[(Any,), Float], + offset: NDArray[(Any,), Float], + convergence_limit: float = 1e-5, + deviance_tolerance: float = 1e-6, + max_iter: int = 250, + max_step_size: int = 25, + max_half_steps: int = 25) -> Optional[FirthFit]: + ''' + Firth’s bias-Reduced penalized-likelihood logistic regression. + + :param beta_init: Initial beta values + :param X: Independent variable (covariate for null fit, genotype for SNP fit) + :param y: Dependent variable (phenotype) + :param offset: Offset (phenotype offset only for null fit, also with covariate offset for SNP fit) + :param convergence_limit: Convergence is reached if all entries of the penalized score have smaller magnitude + :param deviance_tolerance: Non-inferiority margin when halving step size + :param max_iter: Maximum number of Firth iterations + :param max_step_size: Maximum step size during a Firth iteration + :param max_half_steps: Maximum number of half-steps during a Firth iteration + :return: None if the fit failed + ''' + + n_iter = 0 + beta = beta_init + log_likelihood = _calculate_log_likelihood(beta, X, y, offset) + while n_iter < max_iter: + invI = np.linalg.pinv(log_likelihood.I) + + # build hat matrix + rootG_X = np.sqrt(log_likelihood.G) @ X + h = np.diagonal(rootG_X @ invI @ rootG_X.T) + U = X.T @ (y - log_likelihood.pi + h * (0.5 - log_likelihood.pi)) + + # f' / f'' + delta = invI @ U + + # force absolute step size to be less than max_step_size for each entry of beta + step_size = np.linalg.norm(delta, np.inf) + mx = step_size / max_step_size + if mx > 1: + delta = delta / mx + + new_log_likelihood = _calculate_log_likelihood(beta + delta, X, y, offset) + + # if the penalized log likelihood decreased, recompute with step-halving + n_half_steps = 0 + while new_log_likelihood.deviance >= log_likelihood.deviance + deviance_tolerance: + if n_half_steps == max_half_steps: + print(f"Too many half-steps! {new_log_likelihood.deviance} vs {log_likelihood.deviance}") + return None + delta /= 2 + new_log_likelihood = _calculate_log_likelihood(beta + delta, X, y, offset) + n_half_steps += 1 + + beta = beta + delta + log_likelihood = new_log_likelihood + + if np.linalg.norm(U, np.inf) < convergence_limit: + break + + n_iter += 1 + + if n_iter == max_iter: + print("Too many iterations!") + return None + + return FirthFit(beta, log_likelihood) + + +@typechecked +def create_approx_firth_state( + Y: NDArray[(Any, Any), Float], + offset_df: Optional[pd.DataFrame], + C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float], + fit_intercept: bool) -> ApproxFirthState: + ''' + Performs the null fit for approximate Firth. + + :return: Penalized log-likelihood of null fit and offset with covariate effects for SNP fit + ''' + + num_Y = Y.shape[1] + null_model_deviance = np.zeros(num_Y) + logit_offset = np.zeros(Y.shape) + + for i in range(num_Y): + y = Y[:, i] + y_mask = Y_mask[:, i] + offset = offset_df.iloc[:, i].to_numpy() if offset_df is not None else np.zeros(y.shape) + b0_null_fit = np.zeros(C.shape[1]) + if fit_intercept: + b0_null_fit[-1] = (0.5 + y.sum()) / (y_mask.sum() + 1) + b0_null_fit[-1] = np.log(b0_null_fit[-1] / (1 - b0_null_fit[-1])) - offset.mean() + # In regenie, this may retry with max_step_size=5, max_iter=5000 + firth_fit_result = _fit_firth(b0_null_fit, C, y, offset) + if firth_fit_result is None: + raise ValueError("Null fit failed!") + null_model_deviance[i] = firth_fit_result.log_likelihood.deviance + logit_offset[:, i] = offset + (C @ firth_fit_result.beta) + + return ApproxFirthState(logit_offset, null_model_deviance) + + +@typechecked +def correct_approx_firth( + x_res: NDArray[(Any,), Float], + y_res: NDArray[(Any,), Float], + logit_offset: NDArray[(Any,), Float], + null_model_deviance: Float) -> Optional[Series]: + ''' + Calculate LRT statistics for a SNP using the approximate Firth method. + + :return: None if the Firth fit did not converge, LRT statistics otherwise + ''' + + firth_fit = _fit_firth( + np.zeros(1), + np.expand_dims(x_res, axis=1), + y_res, + logit_offset + ) + if firth_fit is None: + return None + # Likelihood-ratio test + tvalue = -1 * (firth_fit.log_likelihood.deviance - null_model_deviance) + pvalue = stats.chi2.sf(tvalue, 1) + effect = firth_fit.beta.item() + # Hessian of the unpenalized log-likelihood + stderr = np.linalg.pinv(firth_fit.log_likelihood.I).diagonal()[-1] + return Series({'tvalue': tvalue, 'pvalue': pvalue, 'effect': effect, 'stderr': stderr}) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 3c040ab52..bd3919390 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -11,10 +11,12 @@ import opt_einsum as oe from . import functions as gwas_fx from .functions import _VALUES_COLUMN_NAME +from .approx_firth_correction import * __all__ = ['logistic_regression'] -fallback_none = 'none' +correction_none = 'none' +correction_approx_firth = 'approx-firth' @typechecked @@ -23,14 +25,14 @@ def logistic_regression( phenotype_df: pd.DataFrame, covariate_df: pd.DataFrame = pd.DataFrame({}), offset_df: pd.DataFrame = pd.DataFrame({}), - correction: str = 'none', # TODO: Make approx-firth default + correction: str = correction_approx_firth, pvalue_threshold: float = 0.05, fit_intercept: bool = True, values_column: str = 'values', dt: type = np.float64) -> DataFrame: ''' Uses logistic regression to test for association between genotypes and one or more binary - phenotypes. This is a distributed version of the method from regenie: + phenotypes. This is a distributed version of the method from regenie: https://www.biorxiv.org/content/10.1101/2020.06.19.162354v2 On the driver node, we fit a logistic regression model based on the covariates for each @@ -94,14 +96,14 @@ def logistic_regression( state = gwas_fx._loco_make_state( Y, phenotype_df, offset_df, - lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask)) + lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask, correction, fit_intercept)) phenotype_names = phenotype_df.columns.to_series().astype('str') def map_func(pdf_iterator): for pdf in pdf_iterator: - yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, - C, Y_mask, correction, phenotype_names) + yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, C, Y_mask, + correction, pvalue_threshold, phenotype_names) return genotype_df.mapInPandas(map_func, result_struct) @@ -135,6 +137,7 @@ class LogRegState: inv_CtGammaC: NDArray[(Any, Any), Float] gamma: NDArray[(Any, Any), Float] Y_res: NDArray[(Any, Any), Float] + approx_firth_state: Optional[ApproxFirthState] @typechecked @@ -143,7 +146,9 @@ def _create_log_reg_state( phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py offset_df: Optional[pd.DataFrame], C: NDArray[(Any, Any), Float], - Y_mask: NDArray[(Any, Any), Float]) -> LogRegState: + Y_mask: NDArray[(Any, Any), Float], + correction: str, + fit_intercept: bool) -> LogRegState: Y_pred = np.row_stack([ _logistic_null_model_predictions( Y[:, i], C, Y_mask[:, i], @@ -153,7 +158,13 @@ def _create_log_reg_state( gamma = Y_pred * (1 - Y_pred) CtGammaC = C.T @ (gamma[:, :, None] * C) inv_CtGammaC = np.linalg.inv(CtGammaC) - return LogRegState(inv_CtGammaC, gamma, (Y - Y_pred.T) * Y_mask) + + if correction == correction_approx_firth: + approx_firth_state = create_approx_firth_state(Y, offset_df, C, Y_mask, fit_intercept) + else: + approx_firth_state = None + + return LogRegState(inv_CtGammaC, gamma, (Y - Y_pred.T) * Y_mask, approx_firth_state) def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), Float], @@ -169,7 +180,7 @@ def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - fallback_method: str, phenotype_names: pd.Series) -> pd.DataFrame: + correction: str, pvalue_threshold: float, phenotype_names: pd.Series) -> pd.DataFrame: ''' Tests a block of genotypes for association with binary traits. We first residualize the genotypes based on the null model fit, then perform a fast score test to check for @@ -189,13 +200,30 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg t_values = np.ravel(num / denom) p_values = stats.chi2.sf(t_values, 1) - if fallback_method != fallback_none: - # TODO: Call approx firth here - () - del genotype_pdf[_VALUES_COLUMN_NAME] out_df = pd.concat([genotype_pdf] * log_reg_state.Y_res.shape[1]) out_df['tvalue'] = list(np.ravel(t_values)) out_df['pvalue'] = list(np.ravel(p_values)) out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() + + if correction != correction_none: + correction_indices = out_df.index[out_df['pvalue'] < pvalue_threshold] + if correction == correction_approx_firth: + for correction_idx in correction_indices: + snp_index = correction_idx % genotype_pdf.shape[0] + phenotype_index = int(correction_idx / phenotype_names.size) + approx_firth_snp_fit = correct_approx_firth( + X_res[snp_index][phenotype_index], + log_reg_state.Y_res[phenotype_index], + log_reg_state.approx_firth_state.logit_offset[phenotype_index], + log_reg_state.approx_firth_state.null_model_deviance[phenotype_index], + ) + if approx_firth_snp_fit is not None: + out_df.iloc[correction_idx]['tvalue'] = approx_firth_snp_fit.tvalue + out_df.iloc[correction_idx]['pvalue'] = approx_firth_snp_fit.pvalue + else: + print(f"Could not correct {out_df.iloc[correction_idx]}") + else: + raise ValueError(f"Only supported correction method is {correction_approx_firth}") + return out_df diff --git a/python/glow/gwas/tests/test_lin_reg.py b/python/glow/gwas/tests/test_lin_reg.py index 04abd7f12..852b2d91b 100644 --- a/python/glow/gwas/tests/test_lin_reg.py +++ b/python/glow/gwas/tests/test_lin_reg.py @@ -169,55 +169,55 @@ def test_r_glm_covariates(): assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) -def test_multiple(): +def test_multiple(rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) -def test_missing(): +def test_missing(rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 1))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 1))) phenotype_df.loc[0, 0] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression(genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_missing_spark(spark): +def test_missing_spark(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 3))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 3))) phenotype_df.loc[0, 0] = np.nan phenotype_df.loc[[1, 3, 5], 1] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_multiple_spark(spark): +def test_multiple_spark(spark, rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert regression_results_equal(baseline, results) @pytest.mark.min_spark('3') -def test_propagate_extra_cols(spark): +def test_propagate_extra_cols(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, extra_cols) @@ -229,11 +229,11 @@ def test_propagate_extra_cols(spark): @pytest.mark.min_spark('3') -def test_different_values_column(spark): +def test_different_values_column(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -243,52 +243,52 @@ def test_different_values_column(spark): @pytest.mark.min_spark('3') -def test_intercept_no_covariates(spark): +def test_intercept_no_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) # No error run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_validates_missing_covariates(spark): +def test_validates_missing_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) covariate_df.loc[0, 0] = np.nan with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) @pytest.mark.min_spark('3') -def test_validate_same_number_of_rows(spark): - genotype_df = pd.DataFrame(np.random.random((4, 3))) - phenotype_df = pd.DataFrame(np.random.random((4, 5))) - covariate_df = pd.DataFrame(np.random.random((5, 2))) +def test_validate_same_number_of_rows(spark, rg): + genotype_df = pd.DataFrame(rg.random((4, 3))) + phenotype_df = pd.DataFrame(rg.random((4, 5))) + covariate_df = pd.DataFrame(rg.random((5, 2))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) -def test_error_for_old_spark(spark): +def test_error_for_old_spark(spark, rg): if spark.version.startswith('2'): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) with pytest.raises(AttributeError): run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_simple_offset(spark): +def test_simple_offset(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -299,15 +299,15 @@ def test_simple_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset(spark): +def test_multi_offset(spark, rg): num_samples = 25 num_pheno = 25 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) - offset_df = pd.DataFrame(np.random.random((num_samples * 2, num_pheno)), index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) results = run_linear_regression_spark(spark, genotype_df, @@ -322,20 +322,19 @@ def test_multi_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset_with_missing(spark): +def test_multi_offset_with_missing(spark, rg): num_samples = 25 num_pheno = 24 num_geno = 18 contigs = ['chr1', 'chr2', 'chr3'] - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) missing = np.triu(np.ones(phenotype_df.shape)) missing[:, -1] = 0 phenotype_df[missing.astype('bool')] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), - index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': contigs * 6}) results = run_linear_regression_spark(spark, genotype_df, @@ -349,50 +348,50 @@ def test_multi_offset_with_missing(spark): @pytest.mark.min_spark('3') -def test_offset_wrong_columns(spark): +def test_offset_wrong_columns(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), columns=range(10, 20)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), columns=range(10, 20)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_index(spark): +def test_offset_wrong_index(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), index=range(1, 26)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), index=range(1, 26)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_multi_index(spark): +def test_offset_wrong_multi_index(spark, rg): num_samples = 25 num_pheno = 10 contigs = ['chr1', 'chr2'] - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), pd.MultiIndex.from_product([range(1, 26), contigs])) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_different_index_order(spark): +def test_offset_different_index_order(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) phenotype_df.columns = phenotype_df.columns.astype('str') - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) offset_df.columns = offset_df.columns.astype('str') baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df] * num_geno) @@ -409,11 +408,11 @@ def test_offset_different_index_order(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes(spark): +def test_cast_genotypes(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert results['effect'].dtype == np.float64 @@ -421,11 +420,11 @@ def test_cast_genotypes(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes_float32(spark): +def test_cast_genotypes_float32(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, @@ -437,21 +436,21 @@ def test_cast_genotypes_float32(spark): @pytest.mark.min_spark('3') -def test_bad_datatype(spark): +def test_bad_datatype(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, dt=np.int32) @pytest.mark.min_spark('3') -def test_bad_column_name(spark): +def test_bad_column_name(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, @@ -461,12 +460,12 @@ def test_bad_column_name(spark): @pytest.mark.min_spark('3') -def test_values_expr(spark): +def test_values_expr(spark, rg): from pyspark.sql.functions import array, lit num_samples = 5 genotype_df = spark.range(1).withColumn('genotypes', lit(42)) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) array_vals = [lit(i) for i in range(num_samples)] results = lr.linear_regression(genotype_df, phenotype_df, diff --git a/python/glow/gwas/tests/test_log_reg.py b/python/glow/gwas/tests/test_log_reg.py index d0e9c187a..a7e99010a 100644 --- a/python/glow/gwas/tests/test_log_reg.py +++ b/python/glow/gwas/tests/test_log_reg.py @@ -13,9 +13,9 @@ def run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=True): Y = phenotype_df.to_numpy(copy=True) Y_mask = ~np.isnan(Y) Y[~Y_mask] = 0 - state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask) + state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask, lr.correction_none, fit_intercept) values_df = pd.DataFrame({gwas_fx._VALUES_COLUMN_NAME: list(genotype_df.to_numpy().T)}) - return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.fallback_none, + return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.correction_none, 0.05, phenotype_df.columns.to_series().astype('str')) @@ -91,8 +91,8 @@ def assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_inter assert regression_results_equal(glow, golden) -def random_phenotypes(shape): - return np.random.randint(low=0, high=2, size=shape).astype(np.float64) +def random_phenotypes(shape, rg): + return rg.integers(low=0, high=2, size=shape).astype(np.float64) def test_spector_non_missing(): @@ -121,76 +121,76 @@ def test_spector_no_intercept(): assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) -def test_multiple(): +def test_multiple(rg): n_sample = 50 n_cov = 10 n_pheno = 25 - phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno))) - covariate_df = pd.DataFrame(np.random.random((n_sample, n_cov))) - genotype_df = pd.DataFrame(np.random.random((n_sample, 1))) + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) -def test_multiple_missing(): +def test_multiple_missing(rg): n_sample = 50 n_cov = 2 n_pheno = 31 - phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno))) + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) Y = phenotype_df.to_numpy() Y[np.tril_indices_from(Y, k=-20)] = np.nan assert phenotype_df.isna().sum().sum() > 0 - covariate_df = pd.DataFrame(np.random.random((n_sample, n_cov))) - genotype_df = pd.DataFrame(np.random.random((n_sample, 1))) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) @pytest.mark.min_spark('3') -def test_multiple_spark(spark): +def test_multiple_spark(spark, rg): n_sample = 40 n_cov = 5 n_pheno = 5 - phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno))) - covariate_df = pd.DataFrame(np.random.random((n_sample, n_cov))) - genotype_df = pd.DataFrame(np.random.random((n_sample, 1))) + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) run_score_test(genotype_df, phenotype_df, covariate_df) glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, golden) -def random_mask(size, missing_per_column): +def random_mask(size, missing_per_column, rg): base = np.ones(size[0], dtype=bool) base[:missing_per_column] = False - return np.column_stack([np.random.permutation(base) for _ in range(size[1])]) + return np.column_stack([rg.permutation(base) for _ in range(size[1])]) @pytest.mark.min_spark('3') -def test_multiple_spark_missing(spark): +def test_multiple_spark_missing(spark, rg): n_sample = 50 n_cov = 5 n_pheno = 5 - phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno))) + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) Y = phenotype_df.to_numpy() - Y[~random_mask(Y.shape, 10)] = np.nan + Y[~random_mask(Y.shape, 10, rg)] = np.nan assert phenotype_df.isna().sum().sum() > 0 - covariate_df = pd.DataFrame(np.random.random((n_sample, n_cov))) - genotype_df = pd.DataFrame(np.random.random((n_sample, 1))) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, golden) @pytest.mark.min_spark('3') -def test_spark_no_intercept(spark): +def test_spark_no_intercept(spark, rg): n_sample = 50 n_cov = 5 n_pheno = 5 - phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno))) + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) Y = phenotype_df.to_numpy() - Y[~random_mask(Y.shape, 15)] = np.nan + Y[~random_mask(Y.shape, 15, rg)] = np.nan assert phenotype_df.isna().sum().sum() > 0 - covariate_df = pd.DataFrame(np.random.random((n_sample, n_cov))) - genotype_df = pd.DataFrame(np.random.random((n_sample, 1))) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, @@ -201,14 +201,14 @@ def test_spark_no_intercept(spark): @pytest.mark.min_spark('3') -def test_simple_offset(spark): +def test_simple_offset(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) results = run_logistic_regression_spark(spark, genotype_df, phenotype_df, @@ -219,15 +219,15 @@ def test_simple_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset(spark): +def test_multi_offset(spark, rg): num_samples = 50 num_pheno = 25 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) - offset_df = pd.DataFrame(np.random.random((num_samples * 2, num_pheno)), index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) results = run_logistic_regression_spark(spark, genotype_df, @@ -242,11 +242,11 @@ def test_multi_offset(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes_float32(spark): +def test_cast_genotypes_float32(spark, rg): num_samples = 50 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_logistic_regression_spark(spark, genotype_df, @@ -258,19 +258,18 @@ def test_cast_genotypes_float32(spark): @pytest.mark.min_spark('3') -def test_multi_offset_with_missing(spark): +def test_multi_offset_with_missing(spark, rg): num_samples = 25 num_pheno = 24 num_geno = 18 contigs = ['chr1', 'chr2', 'chr3'] - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) phenotype_df.iloc[0, 0] = np.nan phenotype_df.iloc[1, 0] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), - index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': contigs * 6}) results = run_logistic_regression_spark(spark, genotype_df, @@ -283,30 +282,30 @@ def test_multi_offset_with_missing(spark): assert regression_results_equal(results, baseline) -def test_error_for_old_spark(spark): +def test_error_for_old_spark(spark, rg): if spark.version.startswith('2'): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 25), rg)) with pytest.raises(AttributeError): run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_intercept_no_covariates(spark): +def test_intercept_no_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 2), rg)) # No error run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_propagate_extra_cols(spark): +def test_propagate_extra_cols(spark, rg): num_samples = 50 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) results = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df, extra_cols)