Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
513b8be
Add Leland's demo notebook
henrydavidge May 15, 2020
1955d38
Merge pull request #3 from henrydavidge/add-nb
henrydavidge May 19, 2020
41d8fba
block_variants_and_samples Transformer to create genotype DataFrame f…
kianfar77 May 19, 2020
27e400e
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng May 20, 2020
dfa6c08
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng May 21, 2020
f5424ee
feat: ridge models for wgr added (#1)
LelandBarnard May 22, 2020
b065560
[HLS-539] Fix compatibility between blocked GT transformer and WGR (#6)
karenfeng May 29, 2020
9778381
Merge branch 'master' of github.com:projectglow/glow
henrydavidge May 29, 2020
35a2383
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 1, 2020
86fab65
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 2, 2020
265370f
Simplify ordering logic in levels code (#7)
henrydavidge Jun 2, 2020
1f32506
Limit Spark memory conf in tests (#9)
karenfeng Jun 2, 2020
f6f00d4
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 3, 2020
cfc08e6
Improve partitioning in block_variants_and_samples transformer (#11)
kianfar77 Jun 5, 2020
f2f30c0
Remove unnecessary header_block grouping (#10)
karenfeng Jun 5, 2020
bcbadd6
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 5, 2020
5bbad57
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 8, 2020
1686138
Create sample ID blocking helper functions (#12)
karenfeng Jun 10, 2020
6bfad34
Add type-checking to WGR APIs (#14)
karenfeng Jun 12, 2020
afaa6df
Add covariate support (#13)
LelandBarnard Jun 12, 2020
cd6c6a1
Flatten estimated phenotypes (#15)
karenfeng Jun 15, 2020
d558115
Add fit_transform function to models (#17)
karenfeng Jun 17, 2020
79e0eea
Merge branch 'master' of https://github.com/projectglow/glow
karenfeng Jun 19, 2020
e920d06
Rename levels (#20)
karenfeng Jun 22, 2020
939e9bb
Add license headers (#21)
henrydavidge Jun 22, 2020
5037c18
add header to template
henrydavidge Jun 22, 2020
57ee7dd
add header to template
henrydavidge Jun 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ jobs:
sbt coverage core/test coverageReport exit
- run:
name: Run Python tests
no_output_timeout: 30m
environment:
command: |
export PATH=$HOME/conda/envs/glow/bin:$PATH
Expand Down Expand Up @@ -133,6 +134,7 @@ jobs:
sbt core/test exit
- run:
name: Run Python tests
no_output_timeout: 30m
environment:
command: |
export PATH=$HOME/conda/envs/glow/bin:$PATH
Expand Down Expand Up @@ -175,6 +177,7 @@ jobs:
sbt core/test exit
- run:
name: Run Python tests
no_output_timeout: 30m
environment:
command: |
export PATH=$HOME/conda/envs/glow/bin:$PATH
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ maven-repo/
**/__pycache__
*.pyc

# Jupyter notebook checkpoints
.ipynb_checkpoints/

# Sphinx documentation
docs/build

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

# A simple wrapper around the SparkSubmit main class that allows us to run
# PySpark unit tests with the same classpath as our Java tests.
HEAPSIZE=${SPARK_MEMORY:-2g}
HEAPSIZE=${SPARK_MEMORY:-1024m}
java -Xmx"$HEAPSIZE" -cp "$SPARK_CLASSPATH" org.apache.spark.deploy.SparkSubmit "$@"
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ lazy val pythonSettings = Seq(
val env = if (majorMinorVersion(sparkVersion) >= "3.0") {
baseEnv :+ "PYSPARK_ROW_FIELD_SORTING_ENABLED" -> "true"
} else {
baseEnv
baseEnv :+ "ARROW_PRE_0_15_IPC_FORMAT" -> "1"
}
val ret = Process(
Seq("pytest") ++ args,
Expand Down
17 changes: 17 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
# Copyright 2019 The Glow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from pyspark.sql import SparkSession
import pytest

# Set up a new Spark session for each test suite
@pytest.fixture(scope="module")
def spark():
print("set up new spark session")
sess = SparkSession.builder \
.master("local[2]") \
.config("spark.hadoop.io.compression.codecs", "io.projectglow.sql.util.BGZFCodec") \
.config("spark.ui.enabled", "false") \
.getOrCreate()
return sess.newSession()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
io.projectglow.transformers.LiftOverVariantsTransformer
io.projectglow.transformers.blockvariantsandsamples.BlockVariantsAndSamplesTransformer
io.projectglow.transformers.normalizevariants.NormalizeVariantsTransformer
io.projectglow.transformers.splitmultiallelics.SplitMultiallelicsTransformer
io.projectglow.transformers.pipe.PipeTransformer
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/io/projectglow/common/schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ object VariantSchemas {
def plinkSchema(hasSampleIds: Boolean): StructType = {
StructType(plinkBaseSchema :+ plinkGenotypeSchema(hasSampleIds))
}

// BlockedGT Fields
val headerField = StructField("header", StringType)
val sizeField = StructField("size", IntegerType)
val valuesField = StructField("values", ArrayType(DoubleType))
val headerBlockIdField = StructField("header_block", StringType)
val sampleBlockIdField = StructField("sample_block", StringType)
val sortKeyField = StructField("sort_key", LongType)
val meanField = StructField("mu", DoubleType)
val stdDevField = StructField("sig", DoubleType)
}

object FeatureSchemas {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 The Glow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.projectglow.transformers.blockvariantsandsamples

import io.projectglow.DataFrameTransformer
import io.projectglow.common.logging.HlsUsageLogging

import org.apache.spark.sql.DataFrame

/**
* Implements DataFrameTransformer to transform the input DataFrame of variants to Blocked GT
* DataFrame for WGR use
*/
class BlockVariantsAndSamplesTransformer extends DataFrameTransformer with HlsUsageLogging {

import BlockVariantsAndSamplesTransformer._

override def name: String = TRANSFORMER_NAME

override def transform(df: DataFrame, options: Map[String, String]): DataFrame = {

val variantsPerBlock = validateIntegerOption(options, VARIANTS_PER_BLOCK)
val sampleBlockCount = validateIntegerOption(options, SAMPLE_BLOCK_COUNT)

VariantSampleBlockMaker.makeVariantAndSampleBlocks(df, variantsPerBlock, sampleBlockCount)
}
}

object BlockVariantsAndSamplesTransformer {
val TRANSFORMER_NAME = "block_variants_and_samples"
val VARIANTS_PER_BLOCK = "variants_per_block"
val SAMPLE_BLOCK_COUNT = "sample_block_count"

def validateIntegerOption(options: Map[String, String], optionName: String): Int = {
try {
(options.get(optionName).get.toInt)
} catch {
case _: Throwable =>
throw new IllegalArgumentException(
s"$optionName is not provided or cannot be cast as an integer!"
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2019 The Glow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.projectglow.transformers.blockvariantsandsamples

import io.projectglow.common.GlowLogging
import io.projectglow.common.VariantSchemas._
import io.projectglow.functions._

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType}

private[projectglow] object VariantSampleBlockMaker extends GlowLogging {

def makeSampleBlocks(df: DataFrame, sampleBlockCount: Int): DataFrame = {
df.withColumn(
"fractionalSampleBlockSize",
size(col(valuesField.name)) / sampleBlockCount
)
.withColumn(
sampleBlockIdField.name,
explode(
sequence(
lit(1),
lit(sampleBlockCount)
).cast(ArrayType(StringType))
)
)
.withColumn(
valuesField.name,
expr(
s"""slice(
| ${valuesField.name},
| round((${sampleBlockIdField.name} - 1) * fractionalSampleBlockSize) + 1,
| round(${sampleBlockIdField.name} * fractionalSampleBlockSize) - round((${sampleBlockIdField.name} - 1) * fractionalSampleBlockSize)
|)""".stripMargin
)
)
}

def makeVariantAndSampleBlocks(
variantDf: DataFrame,
variantsPerBlock: Int,
sampleBlockCount: Int): DataFrame = {
val windowSpec = Window
.partitionBy(contigNameField.name, sampleBlockIdField.name)
.orderBy(startField.name, refAlleleField.name, alternateAllelesField.name)

val baseDf = variantDf
.withColumn(
sortKeyField.name,
col(startField.name).cast(IntegerType)
)
.withColumn(
headerField.name,
concat_ws(
":",
col(contigNameField.name),
col(startField.name),
col(refAlleleField.name),
col(alternateAllelesField.name)
)
)
.withColumn(
"stats",
subset_struct(
array_summary_stats(
col(valuesField.name)
),
"mean",
"stdDev"
)
)
.withColumn(
meanField.name,
col("stats.mean")
)
.withColumn(
stdDevField.name,
col("stats.stdDev")
)

makeSampleBlocks(baseDf, sampleBlockCount)
.withColumn(
sizeField.name,
size(col(valuesField.name))
)
.withColumn(
headerBlockIdField.name,
concat_ws(
"_",
lit("chr"),
col(contigNameField.name),
lit("block"),
((row_number().over(windowSpec) - 1) / variantsPerBlock).cast(IntegerType)
)
)
.select(
col(headerField.name),
col(sizeField.name),
col(valuesField.name),
col(headerBlockIdField.name),
col(sampleBlockIdField.name),
col(sortKeyField.name),
col(meanField.name),
col(stdDevField.name)
)
}
}
Loading