From 65b196a0fdbc2490e352311ed2b41316c345464e Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 15 Dec 2020 10:29:24 -0800 Subject: [PATCH 1/9] WIP Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 130 ++++++++++++++++++++ python/glow/gwas/logistic_regression.py | 35 ++++-- 2 files changed, 153 insertions(+), 12 deletions(-) create mode 100644 python/glow/gwas/approx_firth_correction.py diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py new file mode 100644 index 000000000..62f59e9a9 --- /dev/null +++ b/python/glow/gwas/approx_firth_correction.py @@ -0,0 +1,130 @@ +from pyspark.sql.types import StringType, StructField +from typing import Any, Optional +import pandas as pd +import numpy as np +from pyspark.sql import DataFrame +import statsmodels.api as sm +from dataclasses import dataclass +from typeguard import typechecked +from nptyping import Float, NDArray +from scipy import stats +import opt_einsum as oe +from . import functions as gwas_fx +from .functions import _VALUES_COLUMN_NAME + +def calculate_log_likelihood(beta, X, y, offset): + pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) + G = np.diagflat(pi * (1-pi)) + I = np.atleast_2d(X.T @ G @ X) # fisher information matrix + LL_matrix = np.atleast_2d(y @ np.log(pi) + (1-y) @ np.log(1-pi)) + _, logdet = np.linalg.slogdet(I) + penalized_LL = np.sum(LL_matrix) + 0.5 * logdet + return (pi, G, I, LL_matrix, penalized_LL) + + +def fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, max_step_size=5, max_half_steps=25): + n_iter = 0 + beta = beta_init + pi, G, I, LL_matrix, penalized_LL = calculate_log_likelihood(beta, X, y, offset) + while n_iter < max_iter: + # inverse of the fisher information matrix + invI = np.linalg.pinv(I) + + # build hat matrix + rootG_X = np.sqrt(G) @ X + H = rootG_X @ invI @ rootG_X.T + h = np.diagonal(H) + + # penalised score + U = X.T @ (y - pi + h * (0.5 - pi)) + if np.amax(np.abs(U)) < tolerance: + break + + # f' / f'' + delta = invI @ U + + # force absolute step size to be less than max_step_size for each entry of beta + mx = np.amax(np.abs(delta)) / max_step_size + if mx > 1: + delta = delta / mx + + new_beta = beta + delta + pi, G, I, new_LL_matrix, new_penalized_LL = calculate_log_likelihood(new_beta, X, y, offset) + + # if the penalized log likelihood decreased, recompute with step-halving + n_half_steps = 0 + while new_penalized_LL < penalized_LL: + if n_half_steps == max_half_steps: + raise ValueError("Too many half-steps!") + delta /= 2 + new_beta = beta + delta + pi, G, I, new_LL_matrix, new_penalized_LL = calculate_log_likelihood(new_beta, X, y, offset) + n_half_steps += 1 + + beta = new_beta + LL_matrix = new_LL_matrix + penalized_LL = new_penalized_LL + + n_iter += 1 + + if n_iter == max_iter: + raise ValueError("Too many iterations!") + + return beta, LL_matrix, penalized_LL + + +# Null fit +@dataclass +class ApproxFirthState: + penalized_LL_null_fit: NDArray[(Any), Float] + logit_offset: NDArray[(Any), Float] + + +@typechecked +def _assemble_approx_firth_state( + Y: NDArray[(Any, Any), Float], + phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py + offset_df: Optional[pd.DataFrame], + C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float]) -> ApproxFirthState: + + num_Y = Y.shape[1] + penalized_LL_null_fit = np.zeros(num_Y) + logit_offset = np.zeros(num_Y) + + for i in range(num_Y): + y = Y[:, i] + y_mask = Y_mask[:, i] + offset = offset_df.iloc[:, i].to_numpy() if offset_df is not None else np.zeros(y.shape) + b0_null_fit = np.zeros(1 + C.shape(0)) + b0_null_fit[0] = (0.5 + y.sum()) / (y_mask.sum() + 1) + b0_null_fit[0] = np.log(b0_null_fit[0] / (1 - b0_null_fit[0])) - offset.mean() + b_null_fit, _, penalized_LL_null_fit[i] = fit_firth_logistic(b0_null_fit, C, y, offset) + logit_offset[i] = offset + (C.values * b_null_fit).sum(axis=1) + + return ApproxFirthState(penalized_LL_null_fit, logit_offset) + + +def _correct_approx_firth_row(r, X_res, approx_firth_state, log_reg_state) -> Series: + b_snp_fit, snp_LL_matrix, snp_penalized_LL = fit_firth_logistic( + np.zeros(1), + X_res[r.snp], + log_reg_state.Y_res[r.phenotype], + approx_firth_state.logit_offset[r.phenotype] + ) + r.tval = 2 * (snp_penalized_LL - approx_firth_state.penalized_LL_null_fit) + r.pvalue = stats.chi2.sf(r.tval, 1) + r.effect = b_snp_fit.item() + r.stderr = np.linalg.pinv(snp_LL_matrix).item() + return r + + +@typechecked +def _correct_approx_firth_inner(genotype_pdf: pd.DataFrame, approx_firth_state: ApproxFirthState, + C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], + log_reg_state: LogRegState, uncorrected_pdf: pd.DataFrame, p_threshold: double) -> pd.DataFrame: + X = np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array) + X_res = _logistic_residualize(X, C, Y_mask, log_reg_state.gamma, log_reg_state.inv_CtGammaC) + out_df = uncorrected_pdf.apply(lambda r: _correct_approx_firth_row(r, X_res, approx_firth_state, log_reg_state) if r.pvalue < p_threshold else r, axis=0) + del genotype_pdf[_VALUES_COLUMN_NAME] + return out_df diff --git a/python/glow/gwas/logistic_regression.py b/python/glow/gwas/logistic_regression.py index efe3cfafe..2f3c673c1 100644 --- a/python/glow/gwas/logistic_regression.py +++ b/python/glow/gwas/logistic_regression.py @@ -14,7 +14,8 @@ __all__ = ['logistic_regression'] -fallback_none = 'none' +correction_none = 'none' +correction_approx_firth = 'approx-firth' @typechecked @@ -23,8 +24,8 @@ def logistic_regression( phenotype_df: pd.DataFrame, covariate_df: pd.DataFrame = pd.DataFrame({}), offset_df: pd.DataFrame = pd.DataFrame({}), - # TODO: fallback is probably not the best name - fallback: str = 'none', # TODO: Make approx-firth default + correction: str = correction_approx_firth, + p_threshold: double = 0.05, fit_intercept: bool = True, values_column: str = 'values', dt: type = np.float64) -> DataFrame: @@ -50,12 +51,13 @@ def logistic_regression( If two levels, the level 0 index should be the same as the `phenotype_df`, and the level 1 index should be the contig name. The two level index scheme allows for per-contig offsets like LOCO predictions from GloWGR. - fallback : Which test to use for variants that meet a significance threshold for the score test + correction : Which correction method to use for variants that meet a significance threshold for the score test + p_threshold : The significance threshold at which correction should be applied fit_intercept : Whether or not to add an intercept column to the covariate DataFrame - values_column : A column name or column expression to test with linear regression. If a column name is provided, + values_column : A column name or column expression to test with logistic regression. If a column name is provided, `genotype_df` should have a column with this name and a numeric array type. If a column expression is provided, the expression should return a numeric array type. - dt : The numpy datatype to use in the linear regression test. Must be `np.float32` or `np.float64`. + dt : The numpy datatype to use in the logistic regression test. Must be `np.float32` or `np.float64`. ''' gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) @@ -84,9 +86,13 @@ def logistic_regression( def map_func(pdf_iterator): for pdf in pdf_iterator: yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, C, - Y_mask.astype(np.float64), fallback, + Y_mask.astype(np.float64), correction, p_threshold, phenotype_df.columns.to_series().astype('str')) + firth_state = gwas_fx._loco_make_state( + Y, phenotype_df, offset_df, + lambda y, pdf, odf: _assemble_approx_firth_state(y, pdf, odf, C, Y_mask)) + return genotype_df.mapInPandas(map_func, result_struct) @@ -153,7 +159,7 @@ def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - fallback_method: str, phenotype_names: pd.Series) -> pd.DataFrame: + phenotype_names: pd.Series) -> pd.DataFrame: ''' Tests a block of genotypes for association with binary traits. We first residualize the genotypes based on the null model fit, then perform a fast score test to check for @@ -173,13 +179,18 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg t_values = np.ravel(num / denom) p_values = stats.chi2.sf(t_values, 1) - if fallback_method != fallback_none: - # TODO: Call approx firth here - () - del genotype_pdf[_VALUES_COLUMN_NAME] out_df = pd.concat([genotype_pdf] * log_reg_state.Y_res.shape[1]) out_df['tvalue'] = list(np.ravel(t_values)) out_df['pvalue'] = list(np.ravel(p_values)) out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() return out_df + + +def _get_correction_fn(correction: str): + if correction != correction_none: + sites_to_correct = p_values.map(lambda p: p < p_threshold) + if correction == correction_approx_firth: + t_values, p_values, effect_size, standard_error = + else: + raise ValueError(f"Supported correction methods: {correction_approx_firth}.") From 71406714153f967a054abf796b703506c6e8c2b2 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 15 Dec 2020 15:46:35 -0800 Subject: [PATCH 2/9] More work Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 55 +++++++-------------- python/glow/gwas/logistic_regression.py | 44 +++++++++++------ 2 files changed, 47 insertions(+), 52 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index 62f59e9a9..d8fcd35d3 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -1,18 +1,14 @@ -from pyspark.sql.types import StringType, StructField from typing import Any, Optional import pandas as pd +from pandas import Series import numpy as np -from pyspark.sql import DataFrame -import statsmodels.api as sm from dataclasses import dataclass from typeguard import typechecked from nptyping import Float, NDArray from scipy import stats -import opt_einsum as oe -from . import functions as gwas_fx -from .functions import _VALUES_COLUMN_NAME -def calculate_log_likelihood(beta, X, y, offset): + +def _calculate_log_likelihood(beta, X, y, offset): pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) G = np.diagflat(pi * (1-pi)) I = np.atleast_2d(X.T @ G @ X) # fisher information matrix @@ -22,10 +18,10 @@ def calculate_log_likelihood(beta, X, y, offset): return (pi, G, I, LL_matrix, penalized_LL) -def fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, max_step_size=5, max_half_steps=25): +def _fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, max_step_size=5, max_half_steps=25): n_iter = 0 beta = beta_init - pi, G, I, LL_matrix, penalized_LL = calculate_log_likelihood(beta, X, y, offset) + pi, G, I, LL_matrix, penalized_LL = _calculate_log_likelihood(beta, X, y, offset) while n_iter < max_iter: # inverse of the fisher information matrix invI = np.linalg.pinv(I) @@ -49,7 +45,7 @@ def fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, ma delta = delta / mx new_beta = beta + delta - pi, G, I, new_LL_matrix, new_penalized_LL = calculate_log_likelihood(new_beta, X, y, offset) + pi, G, I, new_LL_matrix, new_penalized_LL = _calculate_log_likelihood(new_beta, X, y, offset) # if the penalized log likelihood decreased, recompute with step-halving n_half_steps = 0 @@ -58,7 +54,7 @@ def fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, ma raise ValueError("Too many half-steps!") delta /= 2 new_beta = beta + delta - pi, G, I, new_LL_matrix, new_penalized_LL = calculate_log_likelihood(new_beta, X, y, offset) + pi, G, I, new_LL_matrix, new_penalized_LL = _calculate_log_likelihood(new_beta, X, y, offset) n_half_steps += 1 beta = new_beta @@ -73,7 +69,6 @@ def fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, ma return beta, LL_matrix, penalized_LL -# Null fit @dataclass class ApproxFirthState: penalized_LL_null_fit: NDArray[(Any), Float] @@ -81,9 +76,8 @@ class ApproxFirthState: @typechecked -def _assemble_approx_firth_state( +def assemble_approx_firth_state( Y: NDArray[(Any, Any), Float], - phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py offset_df: Optional[pd.DataFrame], C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float]) -> ApproxFirthState: @@ -99,32 +93,21 @@ def _assemble_approx_firth_state( b0_null_fit = np.zeros(1 + C.shape(0)) b0_null_fit[0] = (0.5 + y.sum()) / (y_mask.sum() + 1) b0_null_fit[0] = np.log(b0_null_fit[0] / (1 - b0_null_fit[0])) - offset.mean() - b_null_fit, _, penalized_LL_null_fit[i] = fit_firth_logistic(b0_null_fit, C, y, offset) + b_null_fit, _, penalized_LL_null_fit[i] = _fit_firth_logistic(b0_null_fit, C, y, offset) logit_offset[i] = offset + (C.values * b_null_fit).sum(axis=1) return ApproxFirthState(penalized_LL_null_fit, logit_offset) -def _correct_approx_firth_row(r, X_res, approx_firth_state, log_reg_state) -> Series: - b_snp_fit, snp_LL_matrix, snp_penalized_LL = fit_firth_logistic( +def correct_approx_firth(x_res, y_res, logit_offset, penalized_LL_null_fit) -> Series: + b_snp_fit, snp_LL_matrix, snp_penalized_LL = _fit_firth_logistic( np.zeros(1), - X_res[r.snp], - log_reg_state.Y_res[r.phenotype], - approx_firth_state.logit_offset[r.phenotype] + x_res, + y_res, + logit_offset ) - r.tval = 2 * (snp_penalized_LL - approx_firth_state.penalized_LL_null_fit) - r.pvalue = stats.chi2.sf(r.tval, 1) - r.effect = b_snp_fit.item() - r.stderr = np.linalg.pinv(snp_LL_matrix).item() - return r - - -@typechecked -def _correct_approx_firth_inner(genotype_pdf: pd.DataFrame, approx_firth_state: ApproxFirthState, - C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - log_reg_state: LogRegState, uncorrected_pdf: pd.DataFrame, p_threshold: double) -> pd.DataFrame: - X = np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array) - X_res = _logistic_residualize(X, C, Y_mask, log_reg_state.gamma, log_reg_state.inv_CtGammaC) - out_df = uncorrected_pdf.apply(lambda r: _correct_approx_firth_row(r, X_res, approx_firth_state, log_reg_state) if r.pvalue < p_threshold else r, axis=0) - del genotype_pdf[_VALUES_COLUMN_NAME] - return out_df + tvalue = 2 * (snp_penalized_LL - penalized_LL_null_fit) + pvalue = stats.chi2.sf(tvalue, 1) + effect = b_snp_fit.item() + stderr = np.linalg.pinv(snp_LL_matrix).item() + return Series({'tvalue': tvalue, 'pvalue': pvalue, 'effect': effect, 'stderr': stderr}) diff --git a/python/glow/gwas/logistic_regression.py b/python/glow/gwas/logistic_regression.py index 2f3c673c1..c757d503b 100644 --- a/python/glow/gwas/logistic_regression.py +++ b/python/glow/gwas/logistic_regression.py @@ -11,6 +11,7 @@ import opt_einsum as oe from . import functions as gwas_fx from .functions import _VALUES_COLUMN_NAME +from .approx_firth_correction import * __all__ = ['logistic_regression'] @@ -79,20 +80,16 @@ def logistic_regression( Y_mask = ~(np.isnan(Y)) np.nan_to_num(Y, copy=False) - state = gwas_fx._loco_make_state( + log_reg_state = gwas_fx._loco_make_state( Y, phenotype_df, offset_df, - lambda y, pdf, odf: _assemble_log_reg_state(y, pdf, odf, C, Y_mask)) + lambda y, pdf, odf: _assemble_log_reg_state(y, pdf, odf, C, Y_mask, correction)) def map_func(pdf_iterator): for pdf in pdf_iterator: - yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, C, + yield gwas_fx._loco_dispatch(pdf, log_reg_state, _logistic_regression_inner, C, Y_mask.astype(np.float64), correction, p_threshold, phenotype_df.columns.to_series().astype('str')) - firth_state = gwas_fx._loco_make_state( - Y, phenotype_df, offset_df, - lambda y, pdf, odf: _assemble_approx_firth_state(y, pdf, odf, C, Y_mask)) - return genotype_df.mapInPandas(map_func, result_struct) @@ -125,6 +122,7 @@ class LogRegState: inv_CtGammaC: NDArray[(Any, Any), Float] gamma: NDArray[(Any, Any), Float] Y_res: NDArray[(Any, Any), Float] + approx_firth_state: Optional[ApproxFirthState] @typechecked @@ -133,7 +131,8 @@ def _assemble_log_reg_state( phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py offset_df: Optional[pd.DataFrame], C: NDArray[(Any, Any), Float], - Y_mask: NDArray[(Any, Any), Float]) -> LogRegState: + Y_mask: NDArray[(Any, Any), Float], + correction: str) -> LogRegState: Y_pred = np.row_stack([ _logistic_null_model_predictions( Y[:, i], C, Y_mask[:, i], @@ -143,7 +142,13 @@ def _assemble_log_reg_state( gamma = Y_pred * (1 - Y_pred) CtGammaC = C.T @ (gamma[:, :, None] * C) CtGammaC_inv = np.linalg.inv(CtGammaC) - return LogRegState(CtGammaC_inv, gamma, (Y - Y_pred.T) * Y_mask) + + if correction == correction_approx_firth: + approx_firth_state = assemble_approx_firth_state(Y, offset_df, C, Y_mask) + else: + approx_firth_state = None + + return LogRegState(CtGammaC_inv, gamma, (Y - Y_pred.T) * Y_mask, approx_firth_state) def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), Float], @@ -159,7 +164,7 @@ def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - phenotype_names: pd.Series) -> pd.DataFrame: + correction: str, p_threshold: double, phenotype_names: pd.Series) -> pd.DataFrame: ''' Tests a block of genotypes for association with binary traits. We first residualize the genotypes based on the null model fit, then perform a fast score test to check for @@ -184,13 +189,20 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg out_df['tvalue'] = list(np.ravel(t_values)) out_df['pvalue'] = list(np.ravel(p_values)) out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() - return out_df - -def _get_correction_fn(correction: str): if correction != correction_none: - sites_to_correct = p_values.map(lambda p: p < p_threshold) + correction_indices = out_df.index[out_df['pvalue'] < p_threshold] if correction == correction_approx_firth: - t_values, p_values, effect_size, standard_error = + for correction_idx in correction_indices: + snp_index = correction_idx % genotype_pdf.shape[0] + phenotype_index = correction_idx / phenotype_names.size + out_df.iloc[correction_idx] = correct_approx_firth( + X_res[snp_index, phenotype_index], + log_reg_state.Y_res[phenotype_index], + log_reg_state.approx_firth_state.logit_offset[phenotype_index], + log_reg_state.approx_firth_state.penalized_LL_null_fit[phenotype_index], + ) else: - raise ValueError(f"Supported correction methods: {correction_approx_firth}.") + raise ValueError(f"Only supported correction method is {correction_approx_firth}") + + return out_df From 96315d9a487ffed4cf0f8f2f2af8e464c7396da3 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 21 Dec 2020 14:14:20 -0800 Subject: [PATCH 3/9] Correct param names Signed-off-by: Karen Feng --- python/glow/gwas/log_reg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 2f5077dca..5c3519c01 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -178,7 +178,7 @@ def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - correction: str, p_threshold: double, phenotype_names: pd.Series) -> pd.DataFrame: + correction: str, pvalue_threshold: double, phenotype_names: pd.Series) -> pd.DataFrame: ''' Tests a block of genotypes for association with binary traits. We first residualize the genotypes based on the null model fit, then perform a fast score test to check for @@ -205,7 +205,7 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() if correction != correction_none: - correction_indices = out_df.index[out_df['pvalue'] < p_threshold] + correction_indices = out_df.index[out_df['pvalue'] < pvalue_threshold] if correction == correction_approx_firth: for correction_idx in correction_indices: snp_index = correction_idx % genotype_pdf.shape[0] From 53073f4e62c591c0ac854efb01a1e643538e9d21 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 21 Dec 2020 17:29:41 -0800 Subject: [PATCH 4/9] Get it running Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 49 +++++++++++++++------ python/glow/gwas/log_reg.py | 15 ++++--- python/glow/gwas/tests/test_log_reg.py | 4 +- 3 files changed, 45 insertions(+), 23 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index d8fcd35d3..30d1c5210 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -8,7 +8,12 @@ from scipy import stats -def _calculate_log_likelihood(beta, X, y, offset): +def _calculate_log_likelihood( + beta: NDArray[(Any,), Float], + X: NDArray[(Any, Any), Float], + y: NDArray[(Any,), Float], + offset: NDArray[(Any,), Float]): + pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) G = np.diagflat(pi * (1-pi)) I = np.atleast_2d(X.T @ G @ X) # fisher information matrix @@ -18,7 +23,16 @@ def _calculate_log_likelihood(beta, X, y, offset): return (pi, G, I, LL_matrix, penalized_LL) -def _fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, max_step_size=5, max_half_steps=25): +def _fit_firth_logistic( + beta_init: NDArray[(Any,), Float], + X: NDArray[(Any, Any), Float], + y: NDArray[(Any,), Float], + offset: NDArray[(Any,), Float], + tolerance=1e-5, + max_iter=250, + max_step_size=5, + max_half_steps=25): + n_iter = 0 beta = beta_init pi, G, I, LL_matrix, penalized_LL = _calculate_log_likelihood(beta, X, y, offset) @@ -71,38 +85,45 @@ def _fit_firth_logistic(beta_init, X, y, offset, tolerance=1e-5, max_iter=250, m @dataclass class ApproxFirthState: - penalized_LL_null_fit: NDArray[(Any), Float] - logit_offset: NDArray[(Any), Float] + logit_offset: NDArray[(Any, Any), Float] + penalized_LL_null_fit: NDArray[(Any,), Float] @typechecked -def assemble_approx_firth_state( +def create_approx_firth_state( Y: NDArray[(Any, Any), Float], offset_df: Optional[pd.DataFrame], C: NDArray[(Any, Any), Float], - Y_mask: NDArray[(Any, Any), Float]) -> ApproxFirthState: + Y_mask: NDArray[(Any, Any), Float], + fit_intercept: bool) -> ApproxFirthState: num_Y = Y.shape[1] penalized_LL_null_fit = np.zeros(num_Y) - logit_offset = np.zeros(num_Y) + logit_offset = np.zeros(Y.shape) for i in range(num_Y): y = Y[:, i] y_mask = Y_mask[:, i] offset = offset_df.iloc[:, i].to_numpy() if offset_df is not None else np.zeros(y.shape) - b0_null_fit = np.zeros(1 + C.shape(0)) - b0_null_fit[0] = (0.5 + y.sum()) / (y_mask.sum() + 1) - b0_null_fit[0] = np.log(b0_null_fit[0] / (1 - b0_null_fit[0])) - offset.mean() + b0_null_fit = np.zeros(C.shape[1]) + if fit_intercept: + b0_null_fit[-1] = (0.5 + y.sum()) / (y_mask.sum() + 1) + b0_null_fit[-1] = np.log(b0_null_fit[-1] / (1 - b0_null_fit[-1])) - offset.mean() b_null_fit, _, penalized_LL_null_fit[i] = _fit_firth_logistic(b0_null_fit, C, y, offset) - logit_offset[i] = offset + (C.values * b_null_fit).sum(axis=1) + logit_offset[:, i] = offset + (C @ b_null_fit) + + return ApproxFirthState(logit_offset, penalized_LL_null_fit) - return ApproxFirthState(penalized_LL_null_fit, logit_offset) +def correct_approx_firth( + x_res: NDArray[(Any,), Float], + y_res: NDArray[(Any,), Float], + logit_offset: NDArray[(Any,), Float], + penalized_LL_null_fit: Float) -> Series: -def correct_approx_firth(x_res, y_res, logit_offset, penalized_LL_null_fit) -> Series: b_snp_fit, snp_LL_matrix, snp_penalized_LL = _fit_firth_logistic( np.zeros(1), - x_res, + np.expand_dims(x_res, axis=1), y_res, logit_offset ) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 5c3519c01..21cbbbc6d 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -32,7 +32,7 @@ def logistic_regression( dt: type = np.float64) -> DataFrame: ''' Uses logistic regression to test for association between genotypes and one or more binary - phenotypes. This is a distributed version of the method from regenie: + phenotypes. This is a distributed version of the method from regenie: https://www.biorxiv.org/content/10.1101/2020.06.19.162354v2 On the driver node, we fit a logistic regression model based on the covariates for each @@ -96,7 +96,7 @@ def logistic_regression( log_reg_state = gwas_fx._loco_make_state( Y, phenotype_df, offset_df, - lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask, correction)) + lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask, correction, fit_intercept)) def map_func(pdf_iterator): for pdf in pdf_iterator: @@ -146,7 +146,8 @@ def _create_log_reg_state( offset_df: Optional[pd.DataFrame], C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - correction: str) -> LogRegState: + correction: str, + fit_intercept: bool) -> LogRegState: Y_pred = np.row_stack([ _logistic_null_model_predictions( Y[:, i], C, Y_mask[:, i], @@ -158,7 +159,7 @@ def _create_log_reg_state( inv_CtGammaC = np.linalg.inv(CtGammaC) if correction == correction_approx_firth: - approx_firth_state = assemble_approx_firth_state(Y, offset_df, C, Y_mask) + approx_firth_state = create_approx_firth_state(Y, offset_df, C, Y_mask, fit_intercept) else: approx_firth_state = None @@ -178,7 +179,7 @@ def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - correction: str, pvalue_threshold: double, phenotype_names: pd.Series) -> pd.DataFrame: + correction: str, pvalue_threshold: float, phenotype_names: pd.Series) -> pd.DataFrame: ''' Tests a block of genotypes for association with binary traits. We first residualize the genotypes based on the null model fit, then perform a fast score test to check for @@ -209,9 +210,9 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg if correction == correction_approx_firth: for correction_idx in correction_indices: snp_index = correction_idx % genotype_pdf.shape[0] - phenotype_index = correction_idx / phenotype_names.size + phenotype_index = int(correction_idx / phenotype_names.size) out_df.iloc[correction_idx] = correct_approx_firth( - X_res[snp_index, phenotype_index], + X_res[snp_index][phenotype_index], log_reg_state.Y_res[phenotype_index], log_reg_state.approx_firth_state.logit_offset[phenotype_index], log_reg_state.approx_firth_state.penalized_LL_null_fit[phenotype_index], diff --git a/python/glow/gwas/tests/test_log_reg.py b/python/glow/gwas/tests/test_log_reg.py index b30da3538..d1a4e8c38 100644 --- a/python/glow/gwas/tests/test_log_reg.py +++ b/python/glow/gwas/tests/test_log_reg.py @@ -14,9 +14,9 @@ def run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=True): Y = phenotype_df.to_numpy(copy=True) Y_mask = ~np.isnan(Y) Y[~Y_mask] = 0 - state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask) + state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask, lr.correction_none, fit_intercept) values_df = pd.DataFrame({gwas_fx._VALUES_COLUMN_NAME: list(genotype_df.to_numpy().T)}) - return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.fallback_none, + return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.correction_none, 0.05, phenotype_df.columns.to_series().astype('str')) From 6223aea7280ca686edff739b4cbb955cf595b7de Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 21 Dec 2020 18:10:39 -0800 Subject: [PATCH 5/9] Some clean up Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 42 ++++++++++++++------- python/glow/gwas/log_reg.py | 7 +++- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index 30d1c5210..0367404e6 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -17,12 +17,20 @@ def _calculate_log_likelihood( pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) G = np.diagflat(pi * (1-pi)) I = np.atleast_2d(X.T @ G @ X) # fisher information matrix - LL_matrix = np.atleast_2d(y @ np.log(pi) + (1-y) @ np.log(1-pi)) + eps = 1e-15 # Avoid log underflow + LL_matrix = np.atleast_2d(y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps)) _, logdet = np.linalg.slogdet(I) penalized_LL = np.sum(LL_matrix) + 0.5 * logdet return (pi, G, I, LL_matrix, penalized_LL) +@dataclass +class FirthFitResults: + beta: NDArray[(Any,), Float] + LL_matrix: NDArray[(Any, Any), Float] + penalized_LL: Float + + def _fit_firth_logistic( beta_init: NDArray[(Any,), Float], X: NDArray[(Any, Any), Float], @@ -30,8 +38,8 @@ def _fit_firth_logistic( offset: NDArray[(Any,), Float], tolerance=1e-5, max_iter=250, - max_step_size=5, - max_half_steps=25): + max_step_size=25, + max_half_steps=25) -> Optional[FirthFitResults]: n_iter = 0 beta = beta_init @@ -65,7 +73,8 @@ def _fit_firth_logistic( n_half_steps = 0 while new_penalized_LL < penalized_LL: if n_half_steps == max_half_steps: - raise ValueError("Too many half-steps!") + print("Too many half-steps!") + return None delta /= 2 new_beta = beta + delta pi, G, I, new_LL_matrix, new_penalized_LL = _calculate_log_likelihood(new_beta, X, y, offset) @@ -78,9 +87,10 @@ def _fit_firth_logistic( n_iter += 1 if n_iter == max_iter: - raise ValueError("Too many iterations!") + print("Too many iterations!") + return None - return beta, LL_matrix, penalized_LL + return FirthFitResults(beta, LL_matrix, penalized_LL) @dataclass @@ -109,8 +119,12 @@ def create_approx_firth_state( if fit_intercept: b0_null_fit[-1] = (0.5 + y.sum()) / (y_mask.sum() + 1) b0_null_fit[-1] = np.log(b0_null_fit[-1] / (1 - b0_null_fit[-1])) - offset.mean() - b_null_fit, _, penalized_LL_null_fit[i] = _fit_firth_logistic(b0_null_fit, C, y, offset) - logit_offset[:, i] = offset + (C @ b_null_fit) + firthFitResult = _fit_firth_logistic( + b0_null_fit, C, y, offset, max_step_size=5, max_iter=5000) + if firthFitResult is None: + raise ValueError("Null fit failed!") + penalized_LL_null_fit[i] = firthFitResult.penalized_LL + logit_offset[:, i] = offset + (C @ firthFitResult.beta) return ApproxFirthState(logit_offset, penalized_LL_null_fit) @@ -119,16 +133,18 @@ def correct_approx_firth( x_res: NDArray[(Any,), Float], y_res: NDArray[(Any,), Float], logit_offset: NDArray[(Any,), Float], - penalized_LL_null_fit: Float) -> Series: + penalized_LL_null_fit: Float) -> Optional[Series]: - b_snp_fit, snp_LL_matrix, snp_penalized_LL = _fit_firth_logistic( + firthFitResult = _fit_firth_logistic( np.zeros(1), np.expand_dims(x_res, axis=1), y_res, logit_offset ) - tvalue = 2 * (snp_penalized_LL - penalized_LL_null_fit) + if firthFitResult is None: + return None + tvalue = 2 * (firthFitResult.penalized_LL - penalized_LL_null_fit) pvalue = stats.chi2.sf(tvalue, 1) - effect = b_snp_fit.item() - stderr = np.linalg.pinv(snp_LL_matrix).item() + effect = firthFitResult.beta.item() + stderr = np.linalg.pinv(firthFitResult.LL_matrix).item() return Series({'tvalue': tvalue, 'pvalue': pvalue, 'effect': effect, 'stderr': stderr}) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 21cbbbc6d..b9e584666 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -211,12 +211,17 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg for correction_idx in correction_indices: snp_index = correction_idx % genotype_pdf.shape[0] phenotype_index = int(correction_idx / phenotype_names.size) - out_df.iloc[correction_idx] = correct_approx_firth( + approx_firth_results = correct_approx_firth( X_res[snp_index][phenotype_index], log_reg_state.Y_res[phenotype_index], log_reg_state.approx_firth_state.logit_offset[phenotype_index], log_reg_state.approx_firth_state.penalized_LL_null_fit[phenotype_index], ) + if approx_firth_results is not None: + out_df.iloc[correction_idx]['tvalue'] = approx_firth_results.tvalue + out_df.iloc[correction_idx]['pvalue'] = approx_firth_results.pvalue + else: + print(f"Could not correct {out_df.iloc[correction_idx]}") else: raise ValueError(f"Only supported correction method is {correction_approx_firth}") From 77ff9d5528e766618c849ea1d4113c7e605f7378 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 22 Dec 2020 16:44:04 -0800 Subject: [PATCH 6/9] Clean up Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 140 ++++++++++++-------- python/glow/gwas/log_reg.py | 14 +- 2 files changed, 95 insertions(+), 59 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index 0367404e6..671b2d77e 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -8,95 +8,118 @@ from scipy import stats +@dataclass +class Intermediates: + pi: NDArray[(Any,), Float] + G: NDArray[(Any, Any), Float] # diag(pi(1-pi)) + I: NDArray[(Any, Any), Float] # Fisher information matrix, X'GX + + +@dataclass +class LogLikelihood: + intermediates: Intermediates + LL_matrix: NDArray[(Any, Any), Float] + penalized_LL: Float + + +@dataclass +class FirthFit: + beta: NDArray[(Any,), Float] + log_likelihood: LogLikelihood + + +@dataclass +class ApproxFirthState: + logit_offset: NDArray[(Any, Any), Float] + penalized_LL_null_fit: NDArray[(Any,), Float] + + +@typechecked def _calculate_log_likelihood( beta: NDArray[(Any,), Float], X: NDArray[(Any, Any), Float], y: NDArray[(Any,), Float], - offset: NDArray[(Any,), Float]): + offset: NDArray[(Any,), Float], + eps: float = 1e-15) -> LogLikelihood: pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) G = np.diagflat(pi * (1-pi)) - I = np.atleast_2d(X.T @ G @ X) # fisher information matrix - eps = 1e-15 # Avoid log underflow + I = np.atleast_2d(X.T @ G @ X) LL_matrix = np.atleast_2d(y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps)) _, logdet = np.linalg.slogdet(I) penalized_LL = np.sum(LL_matrix) + 0.5 * logdet - return (pi, G, I, LL_matrix, penalized_LL) + return LogLikelihood(Intermediates(pi, G, I), LL_matrix, penalized_LL) -@dataclass -class FirthFitResults: - beta: NDArray[(Any,), Float] - LL_matrix: NDArray[(Any, Any), Float] - penalized_LL: Float - - -def _fit_firth_logistic( +@typechecked +def _fit_firth( beta_init: NDArray[(Any,), Float], X: NDArray[(Any, Any), Float], y: NDArray[(Any,), Float], offset: NDArray[(Any,), Float], - tolerance=1e-5, - max_iter=250, - max_step_size=25, - max_half_steps=25) -> Optional[FirthFitResults]: + convergence_limit: float = 1e-5, + likelihood_tolerance: float = 1e-6, + max_iter: int = 250, + max_step_size: int = 25, + max_half_steps: int = 25) -> Optional[FirthFit]: + ''' + Firth’s bias-Reduced penalized-likelihood logistic regression. + + :param beta_init: Initial beta values + :param X: Independent variable (covariate for null fit, genotype for SNP fit) + :param y: Dependent variable (phenotype) + :param offset: Offset (phenotype offset only for null fit, also with covariate offset for SNP fit) + :param convergence_limit: Convergence is reached if all entries of the penalized score have smaller magnitude + :param likelihood_tolerance: Non-inferiority margin of penalized likelihood when halving step size + :param max_iter: Maximum number of Firth iterations + :param max_step_size: Maximum step size during a Firth iteration + :param max_half_steps: Maximum number of half-steps during a Firth iteration + :return: None if the fit failed + ''' n_iter = 0 beta = beta_init - pi, G, I, LL_matrix, penalized_LL = _calculate_log_likelihood(beta, X, y, offset) + log_likelihood = _calculate_log_likelihood(beta, X, y, offset) while n_iter < max_iter: - # inverse of the fisher information matrix - invI = np.linalg.pinv(I) + invI = np.linalg.pinv(log_likelihood.intermediates.I) # build hat matrix - rootG_X = np.sqrt(G) @ X - H = rootG_X @ invI @ rootG_X.T - h = np.diagonal(H) + rootG_X = np.sqrt(log_likelihood.intermediates.G) @ X + h = np.diagonal(rootG_X @ invI @ rootG_X.T) - # penalised score - U = X.T @ (y - pi + h * (0.5 - pi)) - if np.amax(np.abs(U)) < tolerance: + U = X.T @ (y - log_likelihood.intermediates.pi + h * (0.5 - log_likelihood.intermediates.pi)) + if np.linalg.norm(U, np.inf) < convergence_limit: break # f' / f'' delta = invI @ U # force absolute step size to be less than max_step_size for each entry of beta - mx = np.amax(np.abs(delta)) / max_step_size + mx = np.linalg.norm(delta, np.inf) / max_step_size if mx > 1: delta = delta / mx - new_beta = beta + delta - pi, G, I, new_LL_matrix, new_penalized_LL = _calculate_log_likelihood(new_beta, X, y, offset) + new_log_likelihood = _calculate_log_likelihood(beta + delta, X, y, offset) # if the penalized log likelihood decreased, recompute with step-halving n_half_steps = 0 - while new_penalized_LL < penalized_LL: + while new_log_likelihood.penalized_LL < (log_likelihood.penalized_LL + likelihood_tolerance): if n_half_steps == max_half_steps: print("Too many half-steps!") return None delta /= 2 - new_beta = beta + delta - pi, G, I, new_LL_matrix, new_penalized_LL = _calculate_log_likelihood(new_beta, X, y, offset) + new_log_likelihood = _calculate_log_likelihood(beta + delta, X, y, offset) n_half_steps += 1 - beta = new_beta - LL_matrix = new_LL_matrix - penalized_LL = new_penalized_LL - + beta = beta + delta + log_likelihood = new_log_likelihood n_iter += 1 if n_iter == max_iter: print("Too many iterations!") return None - return FirthFitResults(beta, LL_matrix, penalized_LL) - - -@dataclass -class ApproxFirthState: - logit_offset: NDArray[(Any, Any), Float] - penalized_LL_null_fit: NDArray[(Any,), Float] + return FirthFit(beta, log_likelihood) @typechecked @@ -106,6 +129,11 @@ def create_approx_firth_state( C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], fit_intercept: bool) -> ApproxFirthState: + ''' + Performs the null fit for approximate Firth. + + :return: Penalized log-likelihood of null fit and offset with covariate effects for SNP fit + ''' num_Y = Y.shape[1] penalized_LL_null_fit = np.zeros(num_Y) @@ -119,32 +147,40 @@ def create_approx_firth_state( if fit_intercept: b0_null_fit[-1] = (0.5 + y.sum()) / (y_mask.sum() + 1) b0_null_fit[-1] = np.log(b0_null_fit[-1] / (1 - b0_null_fit[-1])) - offset.mean() - firthFitResult = _fit_firth_logistic( - b0_null_fit, C, y, offset, max_step_size=5, max_iter=5000) - if firthFitResult is None: + # In regenie, this may retry with max_step_size=5, max_iter=5000 + firth_fit_result = _fit_firth(b0_null_fit, C, y, offset) + if firth_fit_result is None: raise ValueError("Null fit failed!") - penalized_LL_null_fit[i] = firthFitResult.penalized_LL - logit_offset[:, i] = offset + (C @ firthFitResult.beta) + penalized_LL_null_fit[i] = firth_fit_result.log_likelihood.penalized_LL + logit_offset[:, i] = offset + (C @ firth_fit_result.beta) return ApproxFirthState(logit_offset, penalized_LL_null_fit) +@typechecked def correct_approx_firth( x_res: NDArray[(Any,), Float], y_res: NDArray[(Any,), Float], logit_offset: NDArray[(Any,), Float], penalized_LL_null_fit: Float) -> Optional[Series]: + ''' + Calculate LRT statistics for a SNP using the approximate Firth method. + + :return: None if the Firth fit did not converge, LRT statistics otherwise + ''' - firthFitResult = _fit_firth_logistic( + firth_fit = _fit_firth( np.zeros(1), np.expand_dims(x_res, axis=1), y_res, logit_offset ) - if firthFitResult is None: + if firth_fit is None: return None - tvalue = 2 * (firthFitResult.penalized_LL - penalized_LL_null_fit) + # Likelihood-ratio test + tvalue = 2 * (firth_fit.penalized_LL - penalized_LL_null_fit) pvalue = stats.chi2.sf(tvalue, 1) - effect = firthFitResult.beta.item() - stderr = np.linalg.pinv(firthFitResult.LL_matrix).item() + effect = firth_fit.beta.item() + # Hessian of the unpenalized log-likelihood + stderr = np.linalg.pinv(firth_fit.log_likelihood.LL_matrix).item() return Series({'tvalue': tvalue, 'pvalue': pvalue, 'effect': effect, 'stderr': stderr}) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 644d9e518..9fb38918d 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -94,7 +94,7 @@ def logistic_regression( Y_mask = ~(np.isnan(Y)) np.nan_to_num(Y, copy=False) - log_reg_state = gwas_fx._loco_make_state( + state = gwas_fx._loco_make_state( Y, phenotype_df, offset_df, lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask, correction, fit_intercept)) @@ -103,13 +103,13 @@ def logistic_regression( bc_C = sc.broadcast(C) bc_Y_mask = sc.broadcast(Y_mask.astype(dt)) - map_func = make_map_func(bc_state, bc_C, bc_Y_mask, correction, + map_func = make_map_func(bc_state, bc_C, bc_Y_mask, correction, pvalue_threshold, phenotype_df.columns.to_series().astype('str')) return genotype_df.mapInPandas(map_func, result_struct) -def make_map_func(bc_state, bc_C, bc_Y_mask, correction, column_names): +def make_map_func(bc_state, bc_C, bc_Y_mask, correction, pvalue_threshold, column_names): def map_func(pdf_iterator): for pdf in pdf_iterator: yield gwas_fx._loco_dispatch(pdf, bc_state.value, _logistic_regression_inner, @@ -223,15 +223,15 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg for correction_idx in correction_indices: snp_index = correction_idx % genotype_pdf.shape[0] phenotype_index = int(correction_idx / phenotype_names.size) - approx_firth_results = correct_approx_firth( + approx_firth_snp_fit = correct_approx_firth( X_res[snp_index][phenotype_index], log_reg_state.Y_res[phenotype_index], log_reg_state.approx_firth_state.logit_offset[phenotype_index], log_reg_state.approx_firth_state.penalized_LL_null_fit[phenotype_index], ) - if approx_firth_results is not None: - out_df.iloc[correction_idx]['tvalue'] = approx_firth_results.tvalue - out_df.iloc[correction_idx]['pvalue'] = approx_firth_results.pvalue + if approx_firth_snp_fit is not None: + out_df.iloc[correction_idx]['tvalue'] = approx_firth_snp_fit.tvalue + out_df.iloc[correction_idx]['pvalue'] = approx_firth_snp_fit.pvalue else: print(f"Could not correct {out_df.iloc[correction_idx]}") else: From e0c065a17edc567143aa7698e9f481638ae2421f Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 22 Dec 2020 17:07:13 -0800 Subject: [PATCH 7/9] Replace penalized LL with deviance Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 37 +++++++++++---------- python/glow/gwas/log_reg.py | 2 +- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index 671b2d77e..f7b418338 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -19,7 +19,7 @@ class Intermediates: class LogLikelihood: intermediates: Intermediates LL_matrix: NDArray[(Any, Any), Float] - penalized_LL: Float + deviance: Float @dataclass @@ -31,7 +31,7 @@ class FirthFit: @dataclass class ApproxFirthState: logit_offset: NDArray[(Any, Any), Float] - penalized_LL_null_fit: NDArray[(Any,), Float] + null_fit_deviance: NDArray[(Any,), Float] @typechecked @@ -47,8 +47,9 @@ def _calculate_log_likelihood( I = np.atleast_2d(X.T @ G @ X) LL_matrix = np.atleast_2d(y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps)) _, logdet = np.linalg.slogdet(I) - penalized_LL = np.sum(LL_matrix) + 0.5 * logdet - return LogLikelihood(Intermediates(pi, G, I), LL_matrix, penalized_LL) + penalty = 0.5 * logdet + deviance = -2 * (np.sum(LL_matrix) + penalty) + return LogLikelihood(Intermediates(pi, G, I), LL_matrix, deviance) @typechecked @@ -58,7 +59,7 @@ def _fit_firth( y: NDArray[(Any,), Float], offset: NDArray[(Any,), Float], convergence_limit: float = 1e-5, - likelihood_tolerance: float = 1e-6, + deviance_tolerance: float = 1e-6, max_iter: int = 250, max_step_size: int = 25, max_half_steps: int = 25) -> Optional[FirthFit]: @@ -70,7 +71,7 @@ def _fit_firth( :param y: Dependent variable (phenotype) :param offset: Offset (phenotype offset only for null fit, also with covariate offset for SNP fit) :param convergence_limit: Convergence is reached if all entries of the penalized score have smaller magnitude - :param likelihood_tolerance: Non-inferiority margin of penalized likelihood when halving step size + :param deviance_tolerance: Non-inferiority margin when halving step size :param max_iter: Maximum number of Firth iterations :param max_step_size: Maximum step size during a Firth iteration :param max_half_steps: Maximum number of half-steps during a Firth iteration @@ -86,16 +87,14 @@ def _fit_firth( # build hat matrix rootG_X = np.sqrt(log_likelihood.intermediates.G) @ X h = np.diagonal(rootG_X @ invI @ rootG_X.T) - U = X.T @ (y - log_likelihood.intermediates.pi + h * (0.5 - log_likelihood.intermediates.pi)) - if np.linalg.norm(U, np.inf) < convergence_limit: - break # f' / f'' delta = invI @ U # force absolute step size to be less than max_step_size for each entry of beta - mx = np.linalg.norm(delta, np.inf) / max_step_size + step_size = np.linalg.norm(delta, np.inf) + mx = step_size / max_step_size if mx > 1: delta = delta / mx @@ -103,9 +102,9 @@ def _fit_firth( # if the penalized log likelihood decreased, recompute with step-halving n_half_steps = 0 - while new_log_likelihood.penalized_LL < (log_likelihood.penalized_LL + likelihood_tolerance): + while new_log_likelihood.deviance >= log_likelihood.deviance + deviance_tolerance: if n_half_steps == max_half_steps: - print("Too many half-steps!") + print(f"Too many half-steps! {new_log_likelihood.deviance} vs {log_likelihood.deviance}") return None delta /= 2 new_log_likelihood = _calculate_log_likelihood(beta + delta, X, y, offset) @@ -113,6 +112,10 @@ def _fit_firth( beta = beta + delta log_likelihood = new_log_likelihood + + if np.linalg.norm(U, np.inf) < convergence_limit: + break + n_iter += 1 if n_iter == max_iter: @@ -136,7 +139,7 @@ def create_approx_firth_state( ''' num_Y = Y.shape[1] - penalized_LL_null_fit = np.zeros(num_Y) + null_fit_deviance = np.zeros(num_Y) logit_offset = np.zeros(Y.shape) for i in range(num_Y): @@ -151,10 +154,10 @@ def create_approx_firth_state( firth_fit_result = _fit_firth(b0_null_fit, C, y, offset) if firth_fit_result is None: raise ValueError("Null fit failed!") - penalized_LL_null_fit[i] = firth_fit_result.log_likelihood.penalized_LL + null_fit_deviance[i] = firth_fit_result.log_likelihood.deviance logit_offset[:, i] = offset + (C @ firth_fit_result.beta) - return ApproxFirthState(logit_offset, penalized_LL_null_fit) + return ApproxFirthState(logit_offset, null_fit_deviance) @typechecked @@ -162,7 +165,7 @@ def correct_approx_firth( x_res: NDArray[(Any,), Float], y_res: NDArray[(Any,), Float], logit_offset: NDArray[(Any,), Float], - penalized_LL_null_fit: Float) -> Optional[Series]: + deviance: Float) -> Optional[Series]: ''' Calculate LRT statistics for a SNP using the approximate Firth method. @@ -178,7 +181,7 @@ def correct_approx_firth( if firth_fit is None: return None # Likelihood-ratio test - tvalue = 2 * (firth_fit.penalized_LL - penalized_LL_null_fit) + tvalue = -1 * (firth_fit.log_likelihood.deviance - deviance) pvalue = stats.chi2.sf(tvalue, 1) effect = firth_fit.beta.item() # Hessian of the unpenalized log-likelihood diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index fbab87476..0f7344b8c 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -216,7 +216,7 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg X_res[snp_index][phenotype_index], log_reg_state.Y_res[phenotype_index], log_reg_state.approx_firth_state.logit_offset[phenotype_index], - log_reg_state.approx_firth_state.penalized_LL_null_fit[phenotype_index], + log_reg_state.approx_firth_state.null_fit_deviance[phenotype_index], ) if approx_firth_snp_fit is not None: out_df.iloc[correction_idx]['tvalue'] = approx_firth_snp_fit.tvalue From ed77ea27e326c43959fda6d570cc3ff1e1633702 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 22 Dec 2020 17:28:27 -0800 Subject: [PATCH 8/9] log likelihood is a single num Signed-off-by: Karen Feng --- python/glow/gwas/approx_firth_correction.py | 24 ++++++++++----------- python/glow/gwas/log_reg.py | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/python/glow/gwas/approx_firth_correction.py b/python/glow/gwas/approx_firth_correction.py index f7b418338..dacba9ac1 100644 --- a/python/glow/gwas/approx_firth_correction.py +++ b/python/glow/gwas/approx_firth_correction.py @@ -18,8 +18,8 @@ class Intermediates: @dataclass class LogLikelihood: intermediates: Intermediates - LL_matrix: NDArray[(Any, Any), Float] - deviance: Float + unpenalized_log_likelihood: Float + deviance: Float # 2 * penalized log likelihood @dataclass @@ -31,7 +31,7 @@ class FirthFit: @dataclass class ApproxFirthState: logit_offset: NDArray[(Any, Any), Float] - null_fit_deviance: NDArray[(Any,), Float] + null_model_deviance: NDArray[(Any,), Float] @typechecked @@ -45,11 +45,11 @@ def _calculate_log_likelihood( pi = 1 - 1 / (np.exp(X @ beta + offset) + 1) G = np.diagflat(pi * (1-pi)) I = np.atleast_2d(X.T @ G @ X) - LL_matrix = np.atleast_2d(y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps)) + unpenalized_log_likelihood = y @ np.log(pi + eps) + (1-y) @ np.log(1-pi + eps) _, logdet = np.linalg.slogdet(I) penalty = 0.5 * logdet - deviance = -2 * (np.sum(LL_matrix) + penalty) - return LogLikelihood(Intermediates(pi, G, I), LL_matrix, deviance) + deviance = -2 * (unpenalized_log_likelihood + penalty) + return LogLikelihood(Intermediates(pi, G, I), unpenalized_log_likelihood, deviance) @typechecked @@ -139,7 +139,7 @@ def create_approx_firth_state( ''' num_Y = Y.shape[1] - null_fit_deviance = np.zeros(num_Y) + null_model_deviance = np.zeros(num_Y) logit_offset = np.zeros(Y.shape) for i in range(num_Y): @@ -154,10 +154,10 @@ def create_approx_firth_state( firth_fit_result = _fit_firth(b0_null_fit, C, y, offset) if firth_fit_result is None: raise ValueError("Null fit failed!") - null_fit_deviance[i] = firth_fit_result.log_likelihood.deviance + null_model_deviance[i] = firth_fit_result.log_likelihood.deviance logit_offset[:, i] = offset + (C @ firth_fit_result.beta) - return ApproxFirthState(logit_offset, null_fit_deviance) + return ApproxFirthState(logit_offset, null_model_deviance) @typechecked @@ -165,7 +165,7 @@ def correct_approx_firth( x_res: NDArray[(Any,), Float], y_res: NDArray[(Any,), Float], logit_offset: NDArray[(Any,), Float], - deviance: Float) -> Optional[Series]: + null_model_deviance: Float) -> Optional[Series]: ''' Calculate LRT statistics for a SNP using the approximate Firth method. @@ -181,9 +181,9 @@ def correct_approx_firth( if firth_fit is None: return None # Likelihood-ratio test - tvalue = -1 * (firth_fit.log_likelihood.deviance - deviance) + tvalue = -1 * (firth_fit.log_likelihood.deviance - null_model_deviance) pvalue = stats.chi2.sf(tvalue, 1) effect = firth_fit.beta.item() # Hessian of the unpenalized log-likelihood - stderr = np.linalg.pinv(firth_fit.log_likelihood.LL_matrix).item() + stderr = 1 / firth_fit.log_likelihood.unpenalized_log_likelihood return Series({'tvalue': tvalue, 'pvalue': pvalue, 'effect': effect, 'stderr': stderr}) diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py index 0f7344b8c..8c7026fe7 100644 --- a/python/glow/gwas/log_reg.py +++ b/python/glow/gwas/log_reg.py @@ -216,7 +216,7 @@ def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogReg X_res[snp_index][phenotype_index], log_reg_state.Y_res[phenotype_index], log_reg_state.approx_firth_state.logit_offset[phenotype_index], - log_reg_state.approx_firth_state.null_fit_deviance[phenotype_index], + log_reg_state.approx_firth_state.null_model_deviance[phenotype_index], ) if approx_firth_snp_fit is not None: out_df.iloc[correction_idx]['tvalue'] = approx_firth_snp_fit.tvalue From e1b52e4e4c5775cd7de0b3916cddef22a653b1f0 Mon Sep 17 00:00:00 2001 From: Henry Davidge Date: Wed, 23 Dec 2020 13:26:29 -0500 Subject: [PATCH 9/9] Pandas based logistic regression (#316) * initial work Signed-off-by: Henry D * add file Signed-off-by: Henry D * workign score test Signed-off-by: Henry D * seems to work Signed-off-by: Henry D * continue Signed-off-by: Henry D * offset support; more tests Signed-off-by: Henry D * delete lin_reg.py Signed-off-by: Henry D * add docs, few more tests Signed-off-by: Henry D * add test file Signed-off-by: Henry D * fix last test Signed-off-by: Henry D * Fix docs, tests Signed-off-by: Henry D * memory limit Signed-off-by: Henry D * try explicitly broadcasting Signed-off-by: Henry D * update environment Signed-off-by: Henry D * undo explicit broadcast Signed-off-by: Henry D * fix typo Signed-off-by: Henry D * f97b0a5aee82445baa8bb4770a4a7ed0437dc6b13ormatting; karen's comment Signed-off-by: Henry D --- conftest.py | 11 + docs/source/environment.yml | 1 + python/environment.yml | 3 +- python/glow/gwas/functions.py | 117 +++++++ python/glow/gwas/lin_reg.py | 127 ++----- python/glow/gwas/log_reg.py | 201 +++++++++++ ...t_linear_regression.py => test_lin_reg.py} | 187 +++++------ python/glow/gwas/tests/test_log_reg.py | 314 ++++++++++++++++++ python/setup.py | 2 + 9 files changed, 765 insertions(+), 198 deletions(-) create mode 100644 python/glow/gwas/functions.py create mode 100644 python/glow/gwas/log_reg.py rename python/glow/gwas/tests/{test_linear_regression.py => test_lin_reg.py} (72%) create mode 100644 python/glow/gwas/tests/test_log_reg.py diff --git a/conftest.py b/conftest.py index e72525fff..5abc80fd2 100644 --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,7 @@ # limitations under the License. +import numpy as np from pyspark.sql import SparkSession import pytest import os @@ -50,6 +51,16 @@ def spark(spark_builder): sess = spark_builder.getOrCreate() return sess.newSession() +def pytest_addoption(parser): + parser.addoption('--random-seed', action='store', type=int, help='Seed to use for random number generator') + +@pytest.fixture(scope="function") +def rg(pytestconfig): + seed = pytestconfig.getoption('random_seed') + seed_seq = np.random.SeedSequence(seed) + print(f'Creating random number generator with seed {seed_seq.entropy}') + return np.random.default_rng(seed_seq) + def pytest_runtest_setup(item): min_spark_version = next((mark.args[0] for mark in item.iter_markers(name='min_spark')), None) if min_spark_version: diff --git a/docs/source/environment.yml b/docs/source/environment.yml index c56ffd3ff..0c4fe1bcb 100644 --- a/docs/source/environment.yml +++ b/docs/source/environment.yml @@ -6,6 +6,7 @@ dependencies: - python=3.7 - nptyping=1.3.0 - numpy>=1.17.4 + - opt_einsum>=3.2.0 - pandas>=0.25.3 - pip - scikit-learn=0.22.1 diff --git a/python/environment.yml b/python/environment.yml index 144f31da1..939ea0f58 100644 --- a/python/environment.yml +++ b/python/environment.yml @@ -8,10 +8,11 @@ dependencies: - click=7.1.1 # Docs notebook source generation - databricks-cli=0.9.1 # Docs notebook source generation - jinja2 - - jupyter + - jupyterlab - nomkl # Skip MKL for local development - nptyping=1.3.0 - numpy=1.17.5 + - opt_einsum>=3.2.0 - pandas=0.25.3 - pip - pyarrow=0.15.1 diff --git a/python/glow/gwas/functions.py b/python/glow/gwas/functions.py new file mode 100644 index 000000000..073668552 --- /dev/null +++ b/python/glow/gwas/functions.py @@ -0,0 +1,117 @@ +from pyspark.sql import SparkSession, functions as fx +from pyspark.sql.types import StructField, StructType, FloatType, DoubleType, ArrayType +from typing import Any, Callable, Dict, List, TypeVar, Union +import numpy as np +import pandas as pd +from nptyping import Float, NDArray +from typeguard import typechecked +import opt_einsum as oe +from ..wgr.linear_model.functions import __assert_all_present, __check_binary + +_VALUES_COLUMN_NAME = '_glow_regression_values' +_GENOTYPES_COLUMN_NAME = 'genotypes' + + +def _check_spark_version(spark: SparkSession) -> bool: + if int(spark.version.split('.')[0]) < 3: + raise AttributeError( + 'Pandas based regression tests are only supported on Spark 3.0 or greater') + + +def _output_schema(input_fields: List[StructField], result_fields: List[StructField]) -> StructType: + + fields = [field for field in input_fields if field.name != _VALUES_COLUMN_NAME] + result_fields + return StructType(fields) + + +def _validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary): + for col in covariate_df: + __assert_all_present(covariate_df, col, 'covariate') + if not covariate_df.empty: + if phenotype_df.shape[0] != covariate_df.shape[0]: + raise ValueError( + f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}' + ) + if is_binary: + __check_binary(phenotype_df) + + +def _regression_sql_type(dt): + if dt == np.float32: + return FloatType() + elif dt == np.float64: + return DoubleType() + else: + raise ValueError('dt must be np.float32 or np.float64') + + +def _prepare_genotype_df(genotype_df, values_column, sql_type): + if isinstance(values_column, str): + if values_column == _GENOTYPES_COLUMN_NAME: + raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"') + out = (genotype_df.withColumn(_VALUES_COLUMN_NAME, + fx.col(values_column).cast( + ArrayType(sql_type))).drop(values_column)) + else: + out = genotype_df.withColumn(_VALUES_COLUMN_NAME, values_column.cast(ArrayType(sql_type))) + + if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]: + out = out.drop(_GENOTYPES_COLUMN_NAME) + return out + + +@typechecked +def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]: + intercept = np.ones((num_samples, 1)) + return np.hstack((intercept, C)) if C.size else intercept + + +def _einsum(subscripts: str, *operands: NDArray) -> NDArray: + ''' + A wrapper around np.einsum to ensure uniform options. + ''' + return oe.contract(subscripts, *operands, casting='no', optimize='dp', memory_limit='max_input') + + +def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool: + return idx1.sort_values().equals(idx2.sort_values()) + + +T = TypeVar('T') + + +def _loco_dispatch(genotype_pdf: pd.DataFrame, state: Union[T, Dict[str, T]], f: Callable, *args): + ''' + Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel + depending whether we have one Y matrix or one Y matrix per contig. + ''' + if isinstance(state, dict): + return genotype_pdf.groupby('contigName', sort=False, as_index=False)\ + .apply(lambda pdf: f(pdf, state[pdf['contigName'].iloc[0]], *args)) + else: + return f(genotype_pdf, state, *args) + + +@typechecked +def _loco_make_state(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, + offset_df: pd.DataFrame, f: Callable[..., T], *args) -> Union[T, Dict[str, T]]: + if not offset_df.empty: + if not _have_same_elements(phenotype_df.columns, offset_df.columns): + raise ValueError(f'phenotype_df and offset_df should have the same column names.') + if offset_df.index.nlevels == 1: # Indexed by sample id + if not _have_same_elements(phenotype_df.index, offset_df.index): + raise ValueError(f'phenotype_df and offset_df should have the same index.') + return f(Y, phenotype_df, offset_df, *args) + elif offset_df.index.nlevels == 2: # Indexed by sample id and contig + all_contigs = offset_df.index.get_level_values(1).unique() + state_dict = {} + for contig in all_contigs: + offset_for_contig = offset_df.xs(contig, level=1) + if not _have_same_elements(phenotype_df.index, offset_for_contig.index): + raise ValueError( + 'When using a multi-indexed offset_df, the offsets for each contig ' + 'should have the same index as phenotype_df') + state_dict[contig] = f(Y, phenotype_df, offset_for_contig, *args) + return state_dict + else: + return f(Y, phenotype_df, None, *args) diff --git a/python/glow/gwas/lin_reg.py b/python/glow/gwas/lin_reg.py index b9057f2e5..1b6808f36 100644 --- a/python/glow/gwas/lin_reg.py +++ b/python/glow/gwas/lin_reg.py @@ -8,12 +8,11 @@ from scipy import stats from typeguard import typechecked from ..wgr.linear_model.functions import __assert_all_present +from . import functions as gwas_fx +from .functions import _VALUES_COLUMN_NAME, _GENOTYPES_COLUMN_NAME __all__ = ['linear_regression'] -_VALUES_COLUMN_NAME = '_linreg_values' -_GENOTYPES_COLUMN_NAME = 'genotypes' - @typechecked def linear_regression(genotype_df: DataFrame, @@ -84,38 +83,14 @@ def linear_regression(genotype_df: DataFrame, - ``pvalue``: P value estimated from a two sided T-test - ``phenotype``: The phenotype name as determined by the column names of ``phenotype_df`` ''' - if int(genotype_df.sql_ctx.sparkSession.version.split('.')[0]) < 3: - raise AttributeError( - 'Pandas based linear regression is only supported on Spark 3.0 or greater') - - # Validate input - for col in covariate_df: - __assert_all_present(covariate_df, col, 'covariate') - if not covariate_df.empty: - if phenotype_df.shape[0] != covariate_df.shape[0]: - raise ValueError( - f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}' - ) - - if dt == np.float32: - sql_type = FloatType() - elif dt == np.float64: - sql_type = DoubleType() - else: - raise ValueError('dt must be np.float32 or np.float64') - - if isinstance(values_column, str): - if values_column == _GENOTYPES_COLUMN_NAME: - raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"') - genotype_df = (genotype_df.withColumn(_VALUES_COLUMN_NAME, - fx.col(values_column).cast( - ArrayType(sql_type))).drop(values_column)) - else: - genotype_df = genotype_df.withColumn(_VALUES_COLUMN_NAME, - values_column.cast(ArrayType(sql_type))) - - if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]: - genotype_df = genotype_df.drop(_GENOTYPES_COLUMN_NAME) + + gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) + + gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=False) + + sql_type = gwas_fx._regression_sql_type(dt) + + genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type) # Construct output schema result_fields = [ @@ -125,13 +100,11 @@ def linear_regression(genotype_df: DataFrame, StructField('pvalue', sql_type), StructField('phenotype', StringType()) ] - fields = [field for field in genotype_df.schema.fields if field.name != _VALUES_COLUMN_NAME - ] + result_fields - result_struct = StructType(fields) + result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields) C = covariate_df.to_numpy(dt, copy=True) if fit_intercept: - C = _add_intercept(C, phenotype_df.shape[0]) + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) # Prepare covariate basis and phenotype residuals Q = np.linalg.qr(C)[0] @@ -140,54 +113,19 @@ def linear_regression(genotype_df: DataFrame, np.nan_to_num(Y, copy=False) _residualize_in_place(Y, Q) - if not offset_df.empty: - if not _have_same_elements(phenotype_df.columns, offset_df.columns): - raise ValueError(f'phenotype_df and offset_df should have the same column names.') - if offset_df.index.nlevels == 1: # Indexed by sample id - if not _have_same_elements(phenotype_df.index, offset_df.index): - raise ValueError(f'phenotype_df and offset_df should have the same index.') - Y_state = _create_YState_from_offset(Y, Y_mask, phenotype_df, offset_df, dt) - elif offset_df.index.nlevels == 2: # Indexed by sample id and contig - all_contigs = offset_df.index.get_level_values(1).unique() - Y_state = {} - for contig in all_contigs: - offset_for_contig = offset_df.xs(contig, level=1) - if not _have_same_elements(phenotype_df.index, offset_for_contig.index): - raise ValueError( - 'When using a multi-indexed offset_df, the offsets for each contig ' - 'should have the same index as phenotype_df') - Y_state[contig] = _create_YState_from_offset(Y, Y_mask, phenotype_df, - offset_for_contig, dt) - else: - Y_state = _create_YState(Y, Y_mask) + Y_state = gwas_fx._loco_make_state(Y, phenotype_df, offset_df, + lambda y, pdf, odf: _create_YState(y, pdf, odf, Y_mask, dt)) dof = C.shape[0] - C.shape[1] - 1 def map_func(pdf_iterator): for pdf in pdf_iterator: - yield _linear_regression_dispatch(pdf, Y_state, Y_mask, Q, dof, - phenotype_df.columns.to_series().astype('str')) + yield gwas_fx._loco_dispatch(pdf, Y_state, _linear_regression_inner, Y_mask, Q, dof, + phenotype_df.columns.to_series().astype('str')) return genotype_df.mapInPandas(map_func, result_struct) -def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool: - return idx1.sort_values().equals(idx2.sort_values()) - - -def _create_YState_from_offset(Y: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], - phenotype_df: pd.DataFrame, offset_df: pd.DataFrame, - dt) -> NDArray[(Any, Any), Float]: - Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt) - return _create_YState(Y, Y_mask) - - -def _create_YState(Y: NDArray[(Any, Any), Float], - Y_mask: NDArray[(Any, Any), Float]) -> NDArray[(Any, Any), Float]: - Y *= Y_mask - return YState(Y, np.sum(Y * Y, axis=0)) - - @dataclass class YState: ''' @@ -197,18 +135,12 @@ class YState: YdotY: NDArray[(Any), Float] -@typechecked -def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]: - intercept = np.ones((num_samples, 1)) - return np.hstack((intercept, C)) if C.size else intercept - - -@typechecked -def _einsum(subscripts: str, *operands: NDArray) -> NDArray: - ''' - A wrapper around np.einsum to ensure uniform options. - ''' - return np.einsum(subscripts, *operands, casting='no') +def _create_YState(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame, + offset_df: pd.DataFrame, Y_mask: NDArray[(Any, Any), Float], dt) -> YState: + if offset_df is not None: + Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt) + Y *= Y_mask + return YState(Y, np.sum(Y * Y, axis=0)) @typechecked @@ -222,19 +154,6 @@ def _residualize_in_place(M: NDArray[(Any, Any), Float], return M -def _linear_regression_dispatch(genotype_pdf: pd.DataFrame, - Y_state: Union[YState, Dict[str, YState]], *args) -> pd.DataFrame: - ''' - Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel - depending whether we have one Y matrix or one Y matrix per contig. - ''' - if isinstance(Y_state, dict): - return genotype_pdf.groupby('contigName', sort=False, as_index=False)\ - .apply(lambda pdf: _linear_regression_inner(pdf, Y_state[pdf['contigName'].iloc[0]], *args)) - else: - return _linear_regression_inner(genotype_pdf, Y_state, *args) - - @typechecked def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState, Y_mask: NDArray[(Any, Any), Float], Q: NDArray[(Any, Any), Float], @@ -255,7 +174,7 @@ def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState, ''' X = _residualize_in_place(np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array), Q) XdotY = Y_state.Y.T @ X - XdotX_reciprocal = 1 / _einsum('sp,sg,sg->pg', Y_mask, X, X) + XdotX_reciprocal = 1 / gwas_fx._einsum('sp,sg,sg->pg', Y_mask, X, X) betas = XdotY * XdotX_reciprocal standard_error = np.sqrt((Y_state.YdotY[:, None] * XdotX_reciprocal - betas * betas) / dof) T = betas / standard_error diff --git a/python/glow/gwas/log_reg.py b/python/glow/gwas/log_reg.py new file mode 100644 index 000000000..b765aafce --- /dev/null +++ b/python/glow/gwas/log_reg.py @@ -0,0 +1,201 @@ +from pyspark.sql.types import StringType, StructField +from typing import Any, Optional +import pandas as pd +import numpy as np +from pyspark.sql import DataFrame +import statsmodels.api as sm +from dataclasses import dataclass +from typeguard import typechecked +from nptyping import Float, NDArray +from scipy import stats +import opt_einsum as oe +from . import functions as gwas_fx +from .functions import _VALUES_COLUMN_NAME + +__all__ = ['logistic_regression'] + +fallback_none = 'none' + + +@typechecked +def logistic_regression( + genotype_df: DataFrame, + phenotype_df: pd.DataFrame, + covariate_df: pd.DataFrame = pd.DataFrame({}), + offset_df: pd.DataFrame = pd.DataFrame({}), + correction: str = 'none', # TODO: Make approx-firth default + pvalue_threshold: float = 0.05, + fit_intercept: bool = True, + values_column: str = 'values', + dt: type = np.float64) -> DataFrame: + ''' + Uses logistic regression to test for association between genotypes and one or more binary + phenotypes. This is a distributed version of the method from regenie: + https://www.biorxiv.org/content/10.1101/2020.06.19.162354v2 + + On the driver node, we fit a logistic regression model based on the covariates for each + phenotype: + + logit(y) ~ C + + where y is a phenotype vector and C is the covariate matrix. + + We compute the probability predictions h_hat and broadcast the residuals (y - y_hat), gamma vectors + (where gamma is defined as y_hat * (1 - Y_hat)), and (C.T gamma C)^-1 matrices. In each task, + we then adjust the new genotypes based on the null fit, perform a score test as a fast scan + for potentially significant variants, and then test variants with p values below a threshold + using a more selective, more expensive test. + + Args: + genotype_df : Spark DataFrame containing genomic data + phenotype_df : Pandas DataFrame containing phenotypic data + covariate_df : An optional Pandas DataFrame containing covariates + offset_df : An optional Pandas DataFrame containing the phenotype offset. This value will be used + as an offset in the covariate only and per variant logistic regression models. The ``offset_df`` may + have one or two levels of indexing. If one level, the index should be the same as the ``phenotype_df``. + If two levels, the level 0 index should be the same as the ``phenotype_df``, and the level 1 index + should be the contig name. The two level index scheme allows for per-contig offsets like + LOCO predictions from GloWGR. + correction : Which test to use for variants that meet a significance threshold for the score test + pvalue_threshold : Variants with a pvalue below this threshold will be tested using the ``correction`` method. + fit_intercept : Whether or not to add an intercept column to the covariate DataFrame + values_column : A column name or column expression to test with linear regression. If a column name is provided, + ``genotype_df`` should have a column with this name and a numeric array type. If a column expression + is provided, the expression should return a numeric array type. + dt : The numpy datatype to use in the linear regression test. Must be ``np.float32`` or ``np.float64``. + + Returns: + A Spark DataFrame that contains + + - All columns from ``genotype_df`` except the ``values_column`` and the ``genotypes`` column if one exists + - ``tvalue``: The chi squared test statistic according to the score test or the correction method + - ``pvalue``: P value estimated from the test statistic + - ``phenotype``: The phenotype name as determiend by the column names of ``phenotype_df`` + ''' + + gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession) + gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=True) + sql_type = gwas_fx._regression_sql_type(dt) + genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type) + result_fields = [ + # TODO: Probably want to put effect size and stderr here for approx-firth + StructField('tvalue', sql_type), + StructField('pvalue', sql_type), + StructField('phenotype', StringType()) + ] + + result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields) + C = covariate_df.to_numpy(dt, copy=True) + if fit_intercept: + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) + Y = phenotype_df.to_numpy(dt, copy=True) + Y_mask = ~(np.isnan(Y)) + np.nan_to_num(Y, copy=False) + + state = gwas_fx._loco_make_state( + Y, phenotype_df, offset_df, + lambda y, pdf, odf: _create_log_reg_state(y, pdf, odf, C, Y_mask)) + + phenotype_names = phenotype_df.columns.to_series().astype('str') + + def map_func(pdf_iterator): + for pdf in pdf_iterator: + yield gwas_fx._loco_dispatch(pdf, state, _logistic_regression_inner, C, Y_mask, + correction, phenotype_names) + + return genotype_df.mapInPandas(map_func, result_struct) + + +@typechecked +def _logistic_null_model_predictions( + y: NDArray[Any, Float], + X: NDArray[Any, Float], + y_mask: NDArray[Any, bool], + # nptying can't handle optional NDArrays, so don't verify the type for offset + offset +) -> NDArray[Any, Float]: + if offset is not None: + offset = offset[y_mask] + model = sm.GLM(y[y_mask], + X[y_mask], + family=sm.families.Binomial(), + offset=offset, + missing='ignore') + fit_result = model.fit() + predictions = model.predict(fit_result.params) + + # Store 0 as prediction for samples with missing phenotypes + remapped_predictions = np.zeros(y.shape) + remapped_predictions[y_mask] = predictions + return remapped_predictions + + +@dataclass +class LogRegState: + inv_CtGammaC: NDArray[(Any, Any), Float] + gamma: NDArray[(Any, Any), Float] + Y_res: NDArray[(Any, Any), Float] + + +@typechecked +def _create_log_reg_state( + Y: NDArray[(Any, Any), Float], + phenotype_df: pd.DataFrame, # Unused, only to share code with lin_reg.py + offset_df: Optional[pd.DataFrame], + C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float]) -> LogRegState: + Y_pred = np.row_stack([ + _logistic_null_model_predictions( + Y[:, i], C, Y_mask[:, i], + offset_df.iloc[:, i].to_numpy() if offset_df is not None else None) + for i in range(Y.shape[1]) + ]) + gamma = Y_pred * (1 - Y_pred) + CtGammaC = C.T @ (gamma[:, :, None] * C) + inv_CtGammaC = np.linalg.inv(CtGammaC) + return LogRegState(inv_CtGammaC, gamma, (Y - Y_pred.T) * Y_mask) + + +def _logistic_residualize(X: NDArray[(Any, Any), Float], C: NDArray[(Any, Any), Float], + Y_mask: NDArray[(Any, Any), Float], gamma: NDArray[(Any, Any), Float], + inv_CtGammaC: NDArray[(Any, Any), Float]) -> NDArray[(Any, Any), Float]: + ''' + Residualize the genotype vectors given the null model predictions. + X_res = X - C(C.T gamma C)^-1 C.T gamma X + ''' + X_hat = gwas_fx._einsum('ic,pcd,ds,ps,sg,sp->igp', C, inv_CtGammaC, C.T, gamma, X, Y_mask) + return X[:, :, None] - X_hat + + +def _logistic_regression_inner(genotype_pdf: pd.DataFrame, log_reg_state: LogRegState, + C: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float], + fallback_method: str, phenotype_names: pd.Series) -> pd.DataFrame: + ''' + Tests a block of genotypes for association with binary traits. We first residualize + the genotypes based on the null model fit, then perform a fast score test to check for + possible significance. + + We use semantic indices for the einsum expressions: + s, i: sample (or individual) + g: genotype + p: phenotype + c, d: covariate + ''' + X = np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array) + with oe.shared_intermediates(): + X_res = _logistic_residualize(X, C, Y_mask, log_reg_state.gamma, log_reg_state.inv_CtGammaC) + num = gwas_fx._einsum('sgp,sp->pg', X_res, log_reg_state.Y_res)**2 + denom = gwas_fx._einsum('sgp,sgp,ps->pg', X_res, X_res, log_reg_state.gamma) + t_values = np.ravel(num / denom) + p_values = stats.chi2.sf(t_values, 1) + + if fallback_method != fallback_none: + # TODO: Call approx firth here + () + + del genotype_pdf[_VALUES_COLUMN_NAME] + out_df = pd.concat([genotype_pdf] * log_reg_state.Y_res.shape[1]) + out_df['tvalue'] = list(np.ravel(t_values)) + out_df['pvalue'] = list(np.ravel(p_values)) + out_df['phenotype'] = phenotype_names.repeat(genotype_pdf.shape[0]).tolist() + return out_df diff --git a/python/glow/gwas/tests/test_linear_regression.py b/python/glow/gwas/tests/test_lin_reg.py similarity index 72% rename from python/glow/gwas/tests/test_linear_regression.py rename to python/glow/gwas/tests/test_lin_reg.py index de9586a97..852b2d91b 100644 --- a/python/glow/gwas/tests/test_linear_regression.py +++ b/python/glow/gwas/tests/test_lin_reg.py @@ -1,6 +1,7 @@ import numpy as np import statsmodels.api as sm import pandas as pd +import glow.gwas.functions as gwas_fx import glow.gwas.lin_reg as lr import pytest import glow @@ -10,7 +11,7 @@ def run_linear_regression(genotype_df, phenotype_df, covariate_df, fit_intercept phenotype_names = phenotype_df.columns.astype('str').to_series() C = covariate_df.to_numpy('float64', copy=True) if fit_intercept: - C = lr._add_intercept(C, genotype_df.shape[0]) + C = gwas_fx._add_intercept(C, genotype_df.shape[0]) if not C.size: C = np.zeros((genotype_df.shape[0], 1)) Y = phenotype_df.to_numpy('float64', copy=True) @@ -18,7 +19,7 @@ def run_linear_regression(genotype_df, phenotype_df, covariate_df, fit_intercept Y[~Y_mask] = 0 Q = np.linalg.qr(C)[0] Y = lr._residualize_in_place(Y, Q) - Y_state = lr._create_YState(Y, Y_mask) + Y_state = lr._create_YState(Y, phenotype_df, None, Y_mask, np.float64) dof = C.shape[0] - C.shape[1] - 1 pdf = pd.DataFrame({lr._VALUES_COLUMN_NAME: list(genotype_df.to_numpy('float64').T)}) @@ -64,7 +65,7 @@ def statsmodels_baseline(genotype_df, C = covariate_df.to_numpy('float64') num_samples = C.shape[0] if C.size else genotype_df.shape[0] if fit_intercept: - C = lr._add_intercept(C, num_samples) + C = gwas_fx._add_intercept(C, num_samples) Y = phenotype_df.to_numpy('float64') X = genotype_df.to_numpy('float64') phenotype_df.columns = phenotype_df.columns.astype('str') @@ -168,55 +169,55 @@ def test_r_glm_covariates(): assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) -def test_multiple(): +def test_multiple(rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) -def test_missing(): +def test_missing(rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 1))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 1))) phenotype_df.loc[0, 0] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression(genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_missing_spark(spark): +def test_missing_spark(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 1))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 3))) + genotype_df = pd.DataFrame(rg.random((num_samples, 1))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 3))) phenotype_df.loc[0, 0] = np.nan phenotype_df.loc[[1, 3, 5], 1] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 3))) + covariate_df = pd.DataFrame(rg.random((num_samples, 3))) glow = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) assert regression_results_equal(glow, baseline) @pytest.mark.min_spark('3') -def test_multiple_spark(spark): +def test_multiple_spark(spark, rg): num_samples = 100 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert regression_results_equal(baseline, results) @pytest.mark.min_spark('3') -def test_propagate_extra_cols(spark): +def test_propagate_extra_cols(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, extra_cols) @@ -228,11 +229,11 @@ def test_propagate_extra_cols(spark): @pytest.mark.min_spark('3') -def test_different_values_column(spark): +def test_different_values_column(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -242,52 +243,52 @@ def test_different_values_column(spark): @pytest.mark.min_spark('3') -def test_intercept_no_covariates(spark): +def test_intercept_no_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) # No error run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_validates_missing_covariates(spark): +def test_validates_missing_covariates(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 3))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) covariate_df.loc[0, 0] = np.nan with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) @pytest.mark.min_spark('3') -def test_validate_same_number_of_rows(spark): - genotype_df = pd.DataFrame(np.random.random((4, 3))) - phenotype_df = pd.DataFrame(np.random.random((4, 5))) - covariate_df = pd.DataFrame(np.random.random((5, 2))) +def test_validate_same_number_of_rows(spark, rg): + genotype_df = pd.DataFrame(rg.random((4, 3))) + phenotype_df = pd.DataFrame(rg.random((4, 5))) + covariate_df = pd.DataFrame(rg.random((5, 2))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) -def test_error_for_old_spark(spark): +def test_error_for_old_spark(spark, rg): if spark.version.startswith('2'): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 25))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 25))) with pytest.raises(AttributeError): run_linear_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) @pytest.mark.min_spark('3') -def test_simple_offset(spark): +def test_simple_offset(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, @@ -298,15 +299,15 @@ def test_simple_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset(spark): +def test_multi_offset(spark, rg): num_samples = 25 num_pheno = 25 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) - offset_df = pd.DataFrame(np.random.random((num_samples * 2, num_pheno)), index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) results = run_linear_regression_spark(spark, genotype_df, @@ -321,20 +322,19 @@ def test_multi_offset(spark): @pytest.mark.min_spark('3') -def test_multi_offset_with_missing(spark): +def test_multi_offset_with_missing(spark, rg): num_samples = 25 num_pheno = 24 num_geno = 18 contigs = ['chr1', 'chr2', 'chr3'] - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) missing = np.triu(np.ones(phenotype_df.shape)) missing[:, -1] = 0 phenotype_df[missing.astype('bool')] = np.nan - covariate_df = pd.DataFrame(np.random.random((num_samples, 10))) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), - index=offset_index) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) extra_cols = pd.DataFrame({'contigName': contigs * 6}) results = run_linear_regression_spark(spark, genotype_df, @@ -344,53 +344,54 @@ def test_multi_offset_with_missing(spark): extra_cols=extra_cols) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df.xs(contig, level=1) for contig in contigs] * 6) + assert regression_results_equal(results, baseline) @pytest.mark.min_spark('3') -def test_offset_wrong_columns(spark): +def test_offset_wrong_columns(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), columns=range(10, 20)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), columns=range(10, 20)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_index(spark): +def test_offset_wrong_index(spark, rg): num_samples = 25 num_pheno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno)), index=range(1, 26)) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno)), index=range(1, 26)) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_wrong_multi_index(spark): +def test_offset_wrong_multi_index(spark, rg): num_samples = 25 num_pheno = 10 contigs = ['chr1', 'chr2'] - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) - offset_df = pd.DataFrame(np.random.random((num_samples * len(contigs), num_pheno)), + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), pd.MultiIndex.from_product([range(1, 26), contigs])) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, offset_df=offset_df) @pytest.mark.min_spark('3') -def test_offset_different_index_order(spark): +def test_offset_different_index_order(spark, rg): num_samples = 25 num_pheno = 6 num_geno = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, num_geno))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(rg.random((num_samples, num_pheno))) phenotype_df.columns = phenotype_df.columns.astype('str') - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) - offset_df = pd.DataFrame(np.random.random((num_samples, num_pheno))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) offset_df.columns = offset_df.columns.astype('str') baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df] * num_geno) @@ -407,11 +408,11 @@ def test_offset_different_index_order(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes(spark): +def test_cast_genotypes(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df) assert results['effect'].dtype == np.float64 @@ -419,11 +420,11 @@ def test_cast_genotypes(spark): @pytest.mark.min_spark('3') -def test_cast_genotypes_float32(spark): +def test_cast_genotypes_float32(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.randint(0, 10, (num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) results = run_linear_regression_spark(spark, genotype_df, @@ -435,21 +436,21 @@ def test_cast_genotypes_float32(spark): @pytest.mark.min_spark('3') -def test_bad_datatype(spark): +def test_bad_datatype(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, phenotype_df, covariate_df, dt=np.int32) @pytest.mark.min_spark('3') -def test_bad_column_name(spark): +def test_bad_column_name(spark, rg): num_samples = 10 - genotype_df = pd.DataFrame(np.random.random((num_samples, 10))) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 5))) + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) with pytest.raises(ValueError): run_linear_regression_spark(spark, genotype_df, @@ -459,12 +460,12 @@ def test_bad_column_name(spark): @pytest.mark.min_spark('3') -def test_values_expr(spark): +def test_values_expr(spark, rg): from pyspark.sql.functions import array, lit num_samples = 5 genotype_df = spark.range(1).withColumn('genotypes', lit(42)) - phenotype_df = pd.DataFrame(np.random.random((num_samples, 5))) - covariate_df = pd.DataFrame(np.random.random((num_samples, 2))) + phenotype_df = pd.DataFrame(rg.random((num_samples, 5))) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) array_vals = [lit(i) for i in range(num_samples)] results = lr.linear_regression(genotype_df, phenotype_df, diff --git a/python/glow/gwas/tests/test_log_reg.py b/python/glow/gwas/tests/test_log_reg.py new file mode 100644 index 000000000..080834755 --- /dev/null +++ b/python/glow/gwas/tests/test_log_reg.py @@ -0,0 +1,314 @@ +import glow.gwas.log_reg as lr +import glow.gwas.functions as gwas_fx +import statsmodels.api as sm +import pandas as pd +import numpy as np +import pytest + + +def run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=True): + C = covariate_df.to_numpy(copy=True) + if fit_intercept: + C = gwas_fx._add_intercept(C, phenotype_df.shape[0]) + Y = phenotype_df.to_numpy(copy=True) + Y_mask = ~np.isnan(Y) + Y[~Y_mask] = 0 + state = lr._create_log_reg_state(Y, pd.DataFrame(), None, C, Y_mask) + values_df = pd.DataFrame({gwas_fx._VALUES_COLUMN_NAME: list(genotype_df.to_numpy().T)}) + return lr._logistic_regression_inner(values_df, state, C, Y_mask, lr.fallback_none, + phenotype_df.columns.to_series().astype('str')) + + +def statsmodels_baseline(genotype_df, + phenotype_df, + covariate_df, + offset_dfs=None, + fit_intercept=True): + if fit_intercept: + covariate_df = sm.add_constant(covariate_df) + p_values = [] + t_values = [] + for phenotype in phenotype_df: + for genotype_idx in range(genotype_df.shape[1]): + mask = ~np.isnan(phenotype_df[phenotype].to_numpy()) + if offset_dfs is not None: + offset = offset_dfs[genotype_idx][phenotype] + else: + offset = None + model = sm.GLM(phenotype_df[phenotype], + covariate_df, + offset=offset, + family=sm.families.Binomial(), + missing='drop') + params = model.fit().params + results = model.score_test(params, + exog_extra=genotype_df.iloc[:, genotype_idx].array[mask]) + t_values.append(results[0]) + p_values.append(results[1]) + return pd.DataFrame({ + 'tvalue': np.concatenate(t_values), + 'pvalue': np.concatenate(p_values), + 'phenotype': phenotype_df.columns.to_series().astype('str').repeat(genotype_df.shape[1]) + }) + + +def run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df=pd.DataFrame({}), + extra_cols=pd.DataFrame({}), + values_column='values', + **kwargs): + pdf = pd.DataFrame({values_column: genotype_df.to_numpy().T.tolist()}) + if not extra_cols.empty: + pdf = pd.concat([pdf, extra_cols], axis=1) + pdf['idx'] = pdf.index + results = (lr.logistic_regression(spark.createDataFrame(pdf), + phenotype_df, + covariate_df, + values_column=values_column, + **kwargs).toPandas().sort_values(['phenotype', + 'idx']).drop('idx', axis=1)) + return results + + +def regression_results_equal(df1, df2, rtol=1e-5): + df1 = df1.sort_values('phenotype', kind='mergesort') + df2 = df2.sort_values('phenotype', kind='mergesort') + strings_equal = np.array_equal(df1.phenotype.array, df2.phenotype.array) + numerics_equal = np.allclose(df1.select_dtypes(exclude=['object']), + df2.select_dtypes(exclude=['object']), + rtol=rtol) + return strings_equal and numerics_equal + + +def assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=True): + glow = run_score_test(genotype_df, phenotype_df, covariate_df, fit_intercept=fit_intercept) + golden = statsmodels_baseline(genotype_df, + phenotype_df, + covariate_df, + fit_intercept=fit_intercept) + assert regression_results_equal(glow, golden) + + +def random_phenotypes(shape, rg): + return rg.integers(low=0, high=2, size=shape).astype(np.float64) + + +def test_spector_non_missing(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_spector_missing(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + phenotype_df.iloc[[0, 3, 10, 25], 0] = np.nan + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_spector_no_intercept(): + ds = sm.datasets.spector.load_pandas() + phenotype_df = pd.DataFrame({ds.endog.name: ds.endog}) + phenotype_df.iloc[[0, 3, 10, 25], 0] = np.nan + genotype_df = ds.exog.loc[:, ['GPA']] + covariate_df = ds.exog.drop('GPA', axis=1) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df, fit_intercept=False) + + +def test_multiple(rg): + n_sample = 50 + n_cov = 10 + n_pheno = 25 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +def test_multiple_missing(rg): + n_sample = 50 + n_cov = 2 + n_pheno = 31 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[np.tril_indices_from(Y, k=-20)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + assert_glow_equals_golden(genotype_df, phenotype_df, covariate_df) + + +@pytest.mark.min_spark('3') +def test_multiple_spark(spark, rg): + n_sample = 40 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + run_score_test(genotype_df, phenotype_df, covariate_df) + glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + assert regression_results_equal(glow, golden) + + +def random_mask(size, missing_per_column, rg): + base = np.ones(size[0], dtype=bool) + base[:missing_per_column] = False + return np.column_stack([rg.permutation(base) for _ in range(size[1])]) + + +@pytest.mark.min_spark('3') +def test_multiple_spark_missing(spark, rg): + n_sample = 50 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[~random_mask(Y.shape, 10, rg)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + glow = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + assert regression_results_equal(glow, golden) + + +@pytest.mark.min_spark('3') +def test_spark_no_intercept(spark, rg): + n_sample = 50 + n_cov = 5 + n_pheno = 5 + phenotype_df = pd.DataFrame(random_phenotypes((n_sample, n_pheno), rg)) + Y = phenotype_df.to_numpy() + Y[~random_mask(Y.shape, 15, rg)] = np.nan + assert phenotype_df.isna().sum().sum() > 0 + covariate_df = pd.DataFrame(rg.random((n_sample, n_cov))) + genotype_df = pd.DataFrame(rg.random((n_sample, 1))) + glow = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + fit_intercept=False) + golden = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, fit_intercept=False) + assert regression_results_equal(glow, golden) + + +@pytest.mark.min_spark('3') +def test_simple_offset(spark, rg): + num_samples = 25 + num_pheno = 6 + num_geno = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_df = pd.DataFrame(rg.random((num_samples, num_pheno))) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, [offset_df] * num_geno) + assert regression_results_equal(results, baseline) + + +@pytest.mark.min_spark('3') +def test_multi_offset(spark, rg): + num_samples = 50 + num_pheno = 25 + num_geno = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 10))) + offset_index = pd.MultiIndex.from_product([phenotype_df.index, ['chr1', 'chr2']]) + offset_df = pd.DataFrame(rg.random((num_samples * 2, num_pheno)), index=offset_index) + extra_cols = pd.DataFrame({'contigName': ['chr1', 'chr2'] * 5}) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df, + extra_cols=extra_cols) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, + [offset_df.xs('chr1', level=1), + offset_df.xs('chr2', level=1)] * 5) + assert regression_results_equal(results, baseline) + + +@pytest.mark.min_spark('3') +def test_cast_genotypes_float32(spark, rg): + num_samples = 50 + genotype_df = pd.DataFrame(rg.integers(0, 10, (num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 5))) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + dt=np.float32) + assert results['pvalue'].dtype == np.float32 + assert regression_results_equal(baseline, results, rtol=1e-4) # Higher rtol for float32 + + +@pytest.mark.min_spark('3') +def test_multi_offset_with_missing(spark, rg): + num_samples = 25 + num_pheno = 24 + num_geno = 18 + contigs = ['chr1', 'chr2', 'chr3'] + genotype_df = pd.DataFrame(rg.random((num_samples, num_geno))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, num_pheno), rg)) + phenotype_df.iloc[0, 0] = np.nan + phenotype_df.iloc[1, 0] = np.nan + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + offset_index = pd.MultiIndex.from_product([phenotype_df.index, contigs]) + offset_df = pd.DataFrame(rg.random((num_samples * len(contigs), num_pheno)), index=offset_index) + extra_cols = pd.DataFrame({'contigName': contigs * 6}) + results = run_logistic_regression_spark(spark, + genotype_df, + phenotype_df, + covariate_df, + offset_df=offset_df, + extra_cols=extra_cols) + baseline = statsmodels_baseline(genotype_df, phenotype_df, covariate_df, + [offset_df.xs(contig, level=1) for contig in contigs] * 6) + assert regression_results_equal(results, baseline) + + +def test_error_for_old_spark(spark, rg): + if spark.version.startswith('2'): + num_samples = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 25), rg)) + with pytest.raises(AttributeError): + run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) + + +@pytest.mark.min_spark('3') +def test_intercept_no_covariates(spark, rg): + num_samples = 10 + genotype_df = pd.DataFrame(rg.random((num_samples, 10))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 2), rg)) + # No error + run_logistic_regression_spark(spark, genotype_df, phenotype_df, pd.DataFrame({})) + + +@pytest.mark.min_spark('3') +def test_propagate_extra_cols(spark, rg): + num_samples = 50 + genotype_df = pd.DataFrame(rg.random((num_samples, 3))) + phenotype_df = pd.DataFrame(random_phenotypes((num_samples, 5), rg)) + covariate_df = pd.DataFrame(rg.random((num_samples, 2))) + extra_cols = pd.DataFrame({'genotype_idx': range(3), 'animal': 'monkey'}) + results = run_logistic_regression_spark(spark, genotype_df, phenotype_df, covariate_df, + extra_cols) + assert sorted(results['genotype_idx'].tolist()) == [0] * 5 + [1] * 5 + [2] * 5 + assert results.animal[results.animal == 'monkey'].all() + assert results.columns.tolist() == ['genotype_idx', 'animal', 'tvalue', 'pvalue', 'phenotype'] diff --git a/python/setup.py b/python/setup.py index 1297af50e..e79b10166 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,7 +23,9 @@ install_requires=[ 'nptyping==1.3.0', 'numpy>=1.17.4', + 'opt_einsum>=3.2.0', 'pandas>=0.25.3', + 'statsmodels>=0.10.0', 'typeguard==2.9.1', ], author='The Glow Authors',