Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


import numpy as np
from pyspark.sql import SparkSession
import pytest
import os
Expand Down Expand Up @@ -50,6 +51,16 @@ def spark(spark_builder):
sess = spark_builder.getOrCreate()
return sess.newSession()

def pytest_addoption(parser):
parser.addoption('--random-seed', action='store', type=int, help='Seed to use for random number generator')

@pytest.fixture(scope="function")
def rg(pytestconfig):
seed = pytestconfig.getoption('random_seed')
seed_seq = np.random.SeedSequence(seed)
print(f'Creating random number generator with seed {seed_seq.entropy}')
return np.random.default_rng(seed_seq)

def pytest_runtest_setup(item):
min_spark_version = next((mark.args[0] for mark in item.iter_markers(name='min_spark')), None)
if min_spark_version:
Expand Down
1 change: 1 addition & 0 deletions docs/source/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- python=3.7
- nptyping=1.3.0
- numpy>=1.17.4
- opt_einsum>=3.2.0
- pandas>=0.25.3
- pip
- scikit-learn=0.22.1
Expand Down
3 changes: 2 additions & 1 deletion python/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ dependencies:
- click=7.1.1 # Docs notebook source generation
- databricks-cli=0.9.1 # Docs notebook source generation
- jinja2
- jupyter
- jupyterlab
- nomkl # Skip MKL for local development
- nptyping=1.3.0
- numpy=1.17.5
- opt_einsum>=3.2.0
- pandas=0.25.3
- pip
- pyarrow=0.15.1
Expand Down
117 changes: 117 additions & 0 deletions python/glow/gwas/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from pyspark.sql import SparkSession, functions as fx
from pyspark.sql.types import StructField, StructType, FloatType, DoubleType, ArrayType
from typing import Any, Callable, Dict, List, TypeVar, Union
import numpy as np
import pandas as pd
from nptyping import Float, NDArray
from typeguard import typechecked
import opt_einsum as oe
from ..wgr.linear_model.functions import __assert_all_present, __check_binary

_VALUES_COLUMN_NAME = '_glow_regression_values'
_GENOTYPES_COLUMN_NAME = 'genotypes'


def _check_spark_version(spark: SparkSession) -> bool:
if int(spark.version.split('.')[0]) < 3:
raise AttributeError(
'Pandas based regression tests are only supported on Spark 3.0 or greater')


def _output_schema(input_fields: List[StructField], result_fields: List[StructField]) -> StructType:

fields = [field for field in input_fields if field.name != _VALUES_COLUMN_NAME] + result_fields
return StructType(fields)


def _validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary):
for col in covariate_df:
__assert_all_present(covariate_df, col, 'covariate')
if not covariate_df.empty:
if phenotype_df.shape[0] != covariate_df.shape[0]:
raise ValueError(
f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}'
)
if is_binary:
__check_binary(phenotype_df)


def _regression_sql_type(dt):
if dt == np.float32:
return FloatType()
elif dt == np.float64:
return DoubleType()
else:
raise ValueError('dt must be np.float32 or np.float64')


def _prepare_genotype_df(genotype_df, values_column, sql_type):
if isinstance(values_column, str):
if values_column == _GENOTYPES_COLUMN_NAME:
raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"')
out = (genotype_df.withColumn(_VALUES_COLUMN_NAME,
fx.col(values_column).cast(
ArrayType(sql_type))).drop(values_column))
else:
out = genotype_df.withColumn(_VALUES_COLUMN_NAME, values_column.cast(ArrayType(sql_type)))

if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]:
out = out.drop(_GENOTYPES_COLUMN_NAME)
return out


@typechecked
def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]:
intercept = np.ones((num_samples, 1))
return np.hstack((intercept, C)) if C.size else intercept


def _einsum(subscripts: str, *operands: NDArray) -> NDArray:
'''
A wrapper around np.einsum to ensure uniform options.
'''
return oe.contract(subscripts, *operands, casting='no', optimize='dp', memory_limit='max_input')


def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool:
return idx1.sort_values().equals(idx2.sort_values())


T = TypeVar('T')


def _loco_dispatch(genotype_pdf: pd.DataFrame, state: Union[T, Dict[str, T]], f: Callable, *args):
'''
Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel
depending whether we have one Y matrix or one Y matrix per contig.
'''
if isinstance(state, dict):
return genotype_pdf.groupby('contigName', sort=False, as_index=False)\
.apply(lambda pdf: f(pdf, state[pdf['contigName'].iloc[0]], *args))
else:
return f(genotype_pdf, state, *args)


@typechecked
def _loco_make_state(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame,
offset_df: pd.DataFrame, f: Callable[..., T], *args) -> Union[T, Dict[str, T]]:
if not offset_df.empty:
if not _have_same_elements(phenotype_df.columns, offset_df.columns):
raise ValueError(f'phenotype_df and offset_df should have the same column names.')
if offset_df.index.nlevels == 1: # Indexed by sample id
if not _have_same_elements(phenotype_df.index, offset_df.index):
raise ValueError(f'phenotype_df and offset_df should have the same index.')
return f(Y, phenotype_df, offset_df, *args)
elif offset_df.index.nlevels == 2: # Indexed by sample id and contig
all_contigs = offset_df.index.get_level_values(1).unique()
state_dict = {}
for contig in all_contigs:
offset_for_contig = offset_df.xs(contig, level=1)
if not _have_same_elements(phenotype_df.index, offset_for_contig.index):
raise ValueError(
'When using a multi-indexed offset_df, the offsets for each contig '
'should have the same index as phenotype_df')
state_dict[contig] = f(Y, phenotype_df, offset_for_contig, *args)
return state_dict
else:
return f(Y, phenotype_df, None, *args)
127 changes: 23 additions & 104 deletions python/glow/gwas/lin_reg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from scipy import stats
from typeguard import typechecked
from ..wgr.linear_model.functions import __assert_all_present
from . import functions as gwas_fx
from .functions import _VALUES_COLUMN_NAME, _GENOTYPES_COLUMN_NAME

__all__ = ['linear_regression']

_VALUES_COLUMN_NAME = '_linreg_values'
_GENOTYPES_COLUMN_NAME = 'genotypes'


@typechecked
def linear_regression(genotype_df: DataFrame,
Expand Down Expand Up @@ -84,38 +83,14 @@ def linear_regression(genotype_df: DataFrame,
- ``pvalue``: P value estimated from a two sided T-test
- ``phenotype``: The phenotype name as determined by the column names of ``phenotype_df``
'''
if int(genotype_df.sql_ctx.sparkSession.version.split('.')[0]) < 3:
raise AttributeError(
'Pandas based linear regression is only supported on Spark 3.0 or greater')

# Validate input
for col in covariate_df:
__assert_all_present(covariate_df, col, 'covariate')
if not covariate_df.empty:
if phenotype_df.shape[0] != covariate_df.shape[0]:
raise ValueError(
f'phenotype_df and covariate_df must have the same number of rows ({phenotype_df.shape[0]} != {covariate_df.shape[0]}'
)

if dt == np.float32:
sql_type = FloatType()
elif dt == np.float64:
sql_type = DoubleType()
else:
raise ValueError('dt must be np.float32 or np.float64')

if isinstance(values_column, str):
if values_column == _GENOTYPES_COLUMN_NAME:
raise ValueError(f'The values column should not be called "{_GENOTYPES_COLUMN_NAME}"')
genotype_df = (genotype_df.withColumn(_VALUES_COLUMN_NAME,
fx.col(values_column).cast(
ArrayType(sql_type))).drop(values_column))
else:
genotype_df = genotype_df.withColumn(_VALUES_COLUMN_NAME,
values_column.cast(ArrayType(sql_type)))

if _GENOTYPES_COLUMN_NAME in [field.name for field in genotype_df.schema]:
genotype_df = genotype_df.drop(_GENOTYPES_COLUMN_NAME)

gwas_fx._check_spark_version(genotype_df.sql_ctx.sparkSession)

gwas_fx._validate_covariates_and_phenotypes(covariate_df, phenotype_df, is_binary=False)

sql_type = gwas_fx._regression_sql_type(dt)

genotype_df = gwas_fx._prepare_genotype_df(genotype_df, values_column, sql_type)

# Construct output schema
result_fields = [
Expand All @@ -125,13 +100,11 @@ def linear_regression(genotype_df: DataFrame,
StructField('pvalue', sql_type),
StructField('phenotype', StringType())
]
fields = [field for field in genotype_df.schema.fields if field.name != _VALUES_COLUMN_NAME
] + result_fields
result_struct = StructType(fields)
result_struct = gwas_fx._output_schema(genotype_df.schema.fields, result_fields)

C = covariate_df.to_numpy(dt, copy=True)
if fit_intercept:
C = _add_intercept(C, phenotype_df.shape[0])
C = gwas_fx._add_intercept(C, phenotype_df.shape[0])

# Prepare covariate basis and phenotype residuals
Q = np.linalg.qr(C)[0]
Expand All @@ -140,54 +113,19 @@ def linear_regression(genotype_df: DataFrame,
np.nan_to_num(Y, copy=False)
_residualize_in_place(Y, Q)

if not offset_df.empty:
if not _have_same_elements(phenotype_df.columns, offset_df.columns):
raise ValueError(f'phenotype_df and offset_df should have the same column names.')
if offset_df.index.nlevels == 1: # Indexed by sample id
if not _have_same_elements(phenotype_df.index, offset_df.index):
raise ValueError(f'phenotype_df and offset_df should have the same index.')
Y_state = _create_YState_from_offset(Y, Y_mask, phenotype_df, offset_df, dt)
elif offset_df.index.nlevels == 2: # Indexed by sample id and contig
all_contigs = offset_df.index.get_level_values(1).unique()
Y_state = {}
for contig in all_contigs:
offset_for_contig = offset_df.xs(contig, level=1)
if not _have_same_elements(phenotype_df.index, offset_for_contig.index):
raise ValueError(
'When using a multi-indexed offset_df, the offsets for each contig '
'should have the same index as phenotype_df')
Y_state[contig] = _create_YState_from_offset(Y, Y_mask, phenotype_df,
offset_for_contig, dt)
else:
Y_state = _create_YState(Y, Y_mask)
Y_state = gwas_fx._loco_make_state(Y, phenotype_df, offset_df,
lambda y, pdf, odf: _create_YState(y, pdf, odf, Y_mask, dt))

dof = C.shape[0] - C.shape[1] - 1

def map_func(pdf_iterator):
for pdf in pdf_iterator:
yield _linear_regression_dispatch(pdf, Y_state, Y_mask, Q, dof,
phenotype_df.columns.to_series().astype('str'))
yield gwas_fx._loco_dispatch(pdf, Y_state, _linear_regression_inner, Y_mask, Q, dof,
phenotype_df.columns.to_series().astype('str'))

return genotype_df.mapInPandas(map_func, result_struct)


def _have_same_elements(idx1: pd.Index, idx2: pd.Index) -> bool:
return idx1.sort_values().equals(idx2.sort_values())


def _create_YState_from_offset(Y: NDArray[(Any, Any), Float], Y_mask: NDArray[(Any, Any), Float],
phenotype_df: pd.DataFrame, offset_df: pd.DataFrame,
dt) -> NDArray[(Any, Any), Float]:
Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt)
return _create_YState(Y, Y_mask)


def _create_YState(Y: NDArray[(Any, Any), Float],
Y_mask: NDArray[(Any, Any), Float]) -> NDArray[(Any, Any), Float]:
Y *= Y_mask
return YState(Y, np.sum(Y * Y, axis=0))


@dataclass
class YState:
'''
Expand All @@ -197,18 +135,12 @@ class YState:
YdotY: NDArray[(Any), Float]


@typechecked
def _add_intercept(C: NDArray[(Any, Any), Float], num_samples: int) -> NDArray[(Any, Any), Float]:
intercept = np.ones((num_samples, 1))
return np.hstack((intercept, C)) if C.size else intercept


@typechecked
def _einsum(subscripts: str, *operands: NDArray) -> NDArray:
'''
A wrapper around np.einsum to ensure uniform options.
'''
return np.einsum(subscripts, *operands, casting='no')
def _create_YState(Y: NDArray[(Any, Any), Float], phenotype_df: pd.DataFrame,
offset_df: pd.DataFrame, Y_mask: NDArray[(Any, Any), Float], dt) -> YState:
if offset_df is not None:
Y = (pd.DataFrame(Y, phenotype_df.index, phenotype_df.columns) - offset_df).to_numpy(dt)
Y *= Y_mask
return YState(Y, np.sum(Y * Y, axis=0))


@typechecked
Expand All @@ -222,19 +154,6 @@ def _residualize_in_place(M: NDArray[(Any, Any), Float],
return M


def _linear_regression_dispatch(genotype_pdf: pd.DataFrame,
Y_state: Union[YState, Dict[str, YState]], *args) -> pd.DataFrame:
'''
Given a pandas DataFrame, dispatch into one or more calls of the linear regression kernel
depending whether we have one Y matrix or one Y matrix per contig.
'''
if isinstance(Y_state, dict):
return genotype_pdf.groupby('contigName', sort=False, as_index=False)\
.apply(lambda pdf: _linear_regression_inner(pdf, Y_state[pdf['contigName'].iloc[0]], *args))
else:
return _linear_regression_inner(genotype_pdf, Y_state, *args)


@typechecked
def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState,
Y_mask: NDArray[(Any, Any), Float], Q: NDArray[(Any, Any), Float],
Expand All @@ -255,7 +174,7 @@ def _linear_regression_inner(genotype_pdf: pd.DataFrame, Y_state: YState,
'''
X = _residualize_in_place(np.column_stack(genotype_pdf[_VALUES_COLUMN_NAME].array), Q)
XdotY = Y_state.Y.T @ X
XdotX_reciprocal = 1 / _einsum('sp,sg,sg->pg', Y_mask, X, X)
XdotX_reciprocal = 1 / gwas_fx._einsum('sp,sg,sg->pg', Y_mask, X, X)
betas = XdotY * XdotX_reciprocal
standard_error = np.sqrt((Y_state.YdotY[:, None] * XdotX_reciprocal - betas * betas) / dof)
T = betas / standard_error
Expand Down
Loading