Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ install_conda_deps: &install_conda_deps


install_pyspark3: &install_pyspark3
run: # Evict PySpark 2.4.3 in favor of PySpark 3.0.0.dev2
run: # Evict PySpark 2.4.3 in favor of PySpark 3.0
name: Install PySpark 3
command: |
export PATH=$HOME/conda/bin:$PATH
if [ ! -e "pyspark-3.0.0.dev2.tar.gz" ]; then
source activate glow
conda remove -y pyspark
wget https://archive.apache.org/dist/spark/spark-3.0.0-preview2/pyspark-3.0.0.dev2.tar.gz
pip install pyspark-3.0.0.dev2.tar.gz
else
echo "Conda environment for Spark 3 already installed"
fi
source activate glow
conda remove -y pyspark
git clone --depth 1 --branch branch-3.0 https://github.com/apache/spark.git
cd spark/python
mv setup.py old-setup.py
mv ~/glow/pyspark-setup.py setup.py
python setup.py sdist
pip install dist/pyspark-3.0.*.tar.gz

check_clean_repo: &check_clean_repo
run:
Expand Down
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,20 @@ lazy val pythonSettings = Seq(
publish / skip := true,
pytest := {
val args = spaceDelimited("<arg>").parsed
val ret = Process(
Seq("pytest") ++ args,
None,
val baseEnv = Seq(
"SPARK_CLASSPATH" -> sparkClasspath.value,
"SPARK_HOME" -> sparkHome.value,
"PYTHONPATH" -> pythonPath.value
)
val env = if (majorMinorVersion(sparkVersion) >= "3.0") {
baseEnv :+ "PYSPARK_ROW_FIELD_SORTING_ENABLED" -> "true"
} else {
baseEnv
}
val ret = Process(
Seq("pytest") ++ args,
None,
env: _*
).!
require(ret == 0, "Python tests failed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,4 @@ Furthermore, the new release supports Scala 2.12 in addition to Scala 2.11. The

Try It!
~~~~~~~
Try Glow 0.3.0 and its new features `here <https://projectglow.io/>`_.
Try Glow 0.3.0 and its new features `here <https://projectglow.io/>`_.
109 changes: 86 additions & 23 deletions docs/source/tertiary/regression-tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ Genome-wide Association Study Regression Tests
import glow
glow.register(spark)

path = 'test-data/1000G.phase3.broad.withGenotypes.chr20.10100000.vcf'
genotypes_vcf = 'test-data/gwas/genotypes.vcf.gz'
covariates_csv = 'test-data/gwas/covariates.csv.gz'
continuous_phenotypes_csv = 'test-data/gwas/continuous-phenotypes.csv.gz'
binary_phenotypes_csv = 'test-data/gwas/binary-phenotypes.csv.gz'

Glow contains functions for performing simple regression analyses used in
genome-wide association studies (GWAS).

.. tip::
Glow automatically converts literal one-dimensional and two-dimensional ``numpy`` ``ndarray`` s of ``double`` s
to ``array<double>`` and ``spark.ml`` ``DenseMatrix`` respectively.

.. _linear-regression:

Linear regression
Expand All @@ -25,34 +32,56 @@ Example

.. code-block:: python

import pandas as pd
from pyspark.ml.linalg import DenseMatrix
import pyspark.sql.functions as fx
from pyspark.sql import Row
from pyspark.sql.functions import col, lit
import numpy as np

# Read in VCF file
df = glow.transform("split_multiallelics", spark.read.format('vcf').load(path)).cache()
variants = spark.read.format('vcf').load(genotypes_vcf)

# genotype_states returns the number of alt alleles for each sample
# mean_substitute replaces any missing genotype states with the mean of the non-missing states
genotypes = glow.transform('split_multiallelics', variants) \
.withColumn('gt', glow.mean_substitute(glow.genotype_states(col('genotypes')))) \
.cache()

# Read covariates from a CSV file and add an intercept
covariates = pd.read_csv(covariates_csv, index_col=0)
covariates['intercept'] = 1.

# Generate random phenotypes and an intercept-only covariate matrix
n_samples = df.select(fx.size('genotypes')).first()[0]
covariates = DenseMatrix(n_samples, 1, np.ones(n_samples))
np.random.seed(500)
phenotypes = np.random.random(n_samples).tolist()
covariates_and_phenotypes = spark.createDataFrame([[covariates, phenotypes]],
['covariates', 'phenotypes'])
# Read phenotypes from a CSV file
pd_phenotypes = pd.read_csv(continuous_phenotypes_csv, index_col=0).T
pd_phenotypes['pt'] = pd_phenotypes.values.tolist()
pd_phenotypes['trait'] = pd_phenotypes.index
phenotypes = spark.createDataFrame(pd_phenotypes[['trait', 'pt']])

# Run linear regression test
lin_reg_df = df.crossJoin(covariates_and_phenotypes).selectExpr(
lin_reg_df = genotypes.crossJoin(phenotypes).select(
'contigName',
'start',
'names',
# genotype_states returns the number of alt alleles for each sample
'expand_struct(linear_regression_gwas(genotype_states(genotypes), phenotypes, covariates))')
'trait',
glow.expand_struct(glow.linear_regression_gwas(
col('gt'),
col('pt'),
lit(covariates.to_numpy())
))
)

.. invisible-code-block: python

from pyspark.sql import Row
assert_rows_equal(lin_reg_df.head(), Row(contigName='20', start=10000053, names=[], beta=-0.012268942487586866, standardError=0.03986890589124242, pValue=0.7583114855349732))

expected_lin_reg_row = Row(
contigName='22',
start=16050114,
names=['rs587755077'],
trait='Continuous_Trait_1',
beta=0.13672636157787335,
standardError=0.1783963733160434,
pValue=0.44349953631952943
)
assert_rows_equal(lin_reg_df.head(), expected_lin_reg_row)

Parameters
----------
Expand Down Expand Up @@ -121,25 +150,59 @@ Example

.. code-block:: python

# Read a single phenotype from a CSV file
trait = 'Binary_Trait_1'
phenotype = np.hstack(pd.read_csv(binary_phenotypes_csv, index_col=0)[[trait]].to_numpy()).astype('double')

# Likelihood ratio test
log_reg_df = df.crossJoin(covariates_and_phenotypes).selectExpr(
lrt_log_reg_df = genotypes.select(
'contigName',
'start',
'names',
'expand_struct(logistic_regression_gwas(genotype_states(genotypes), phenotypes, covariates, \'LRT\'))')
glow.expand_struct(glow.logistic_regression_gwas(
col('gt'),
lit(phenotype),
lit(covariates.to_numpy()),
'LRT'
))
)

# Firth test
firth_log_reg_df = df.crossJoin(covariates_and_phenotypes).selectExpr(
firth_log_reg_df = genotypes.select(
'contigName',
'start',
'names',
'expand_struct(logistic_regression_gwas(genotype_states(genotypes), phenotypes, covariates, \'Firth\'))')
glow.expand_struct(glow.logistic_regression_gwas(
col('gt'),
lit(phenotype),
lit(covariates.to_numpy()),
'Firth'
))
)

.. invisible-code-block: python

assert_rows_equal(log_reg_df.head(), Row(contigName='20', start=10000053, names=[], beta=-0.04909334516505058, oddsRatio=0.9520922523419953, waldConfidenceInterval=[0.5523036168612923, 1.6412705426792646], pValue=0.8161087491239676))
assert_rows_equal(firth_log_reg_df.head(), Row(contigName='20', start=10000053, names=[], beta=-0.04737592899383216, oddsRatio=0.9537287958835796, waldConfidenceInterval=[0.5532645977026418, 1.644057147112848], pValue=0.8205226692490032))

expected_lrt_log_reg_row = Row(
contigName='22',
start=16050114,
names=['rs587755077'],
beta=0.4655549084480197,
oddsRatio=1.5928978561634963,
waldConfidenceInterval=[0.7813704896767115, 3.247273366082802],
pValue=0.19572327843236637
)
assert_rows_equal(lrt_log_reg_df.head(), expected_lrt_log_reg_row)

expected_firth_log_reg_row = Row(
contigName='22',
start=16050114,
names=['rs587755077'],
beta=0.45253994775257755,
oddsRatio=1.5723006796401617,
waldConfidenceInterval=[0.7719062301156017, 3.2026291934794795],
pValue=0.20086839802280376
)
assert_rows_equal(firth_log_reg_df.head(), expected_firth_log_reg_row)

Parameters
----------
Expand Down
26 changes: 26 additions & 0 deletions pyspark-setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Minimal setup.py for PySpark for Python and docs testing
from setuptools import setup
import sys

try:
exec(open('pyspark/version.py').read())
except IOError:
print("Failed to load PySpark version file for packaging. You must be in Spark's python dir.",
file=sys.stderr)
sys.exit(-1)
VERSION = __version__ # noqa

setup(name='pyspark',
version=VERSION,
packages=['pyspark',
'pyspark.mllib',
'pyspark.mllib.linalg',
'pyspark.mllib.stat',
'pyspark.ml',
'pyspark.ml.linalg',
'pyspark.ml.param',
'pyspark.sql',
'pyspark.sql.avro',
'pyspark.sql.pandas',
'pyspark.streaming'],
install_requires=['py4j==0.10.9'])
29 changes: 29 additions & 0 deletions test-data/gwas/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
The genotypes are sampled from the Thousand Genomes Project Phase 3 release chr22 VCF
(ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf).

The covariates and continuous phenotypes are simulated with PhenotypeSimulator
(https://github.com/HannahVMeyer/PhenotypeSimulator) and the 1KG chr22 PLINK files as follows.

Rscript -e "PhenotypeSimulator::simulatePhenotypes()" \
--args \
--genotypefile=ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes \
--format=plink \
--NrSamples=2504 \
--NrPhenotypes=10 \
--genVar=0.4 \
--h2s=0.025 \
--phi=0.6 \
--delta=0.3 \
--pcorr=0.8 \
--NrFixedEffects=4 \
--NrConfounders=1,2,1,2 \
--pIndependentConfounders=0,1,1,0.5 \
--distConfounders=bin,cat_norm,cat_unif,norm \
--probConfounders=0.2 \
--catConfounders=3,4 \
--directory=/pheno-sim \
--subdirectory=test_simulation \
--saveTable

The binary phenotypes are created by binarizing the continuous phenotypes such that all phenotypes < 0 are set to 0,
and those >= 0 are set to 1.
Binary file added test-data/gwas/binary-phenotypes.csv.gz
Binary file not shown.
Binary file added test-data/gwas/continuous-phenotypes.csv.gz
Binary file not shown.
Binary file added test-data/gwas/covariates.csv.gz
Binary file not shown.
Binary file added test-data/gwas/genotypes.vcf.gz
Binary file not shown.