Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/io/projectglow/gff/GffDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ object GffDataSource {
}

/**
* Infers the schema by reading the gff fileusing csv datasource and parsing the attributes fields
* Infers the schema by reading the gff file using csv data source and parsing the attributes fields
* to get official and unofficial attribute columns. Drops the original "attributes" column. Names,
* types and ordering of columns will be as follows:
* Names: All attribute fields will have names exactly as in tags.
Expand Down
42 changes: 42 additions & 0 deletions docs/source/_static/notebooks/etl/gff.html

Large diffs are not rendered by default.

129 changes: 129 additions & 0 deletions docs/source/etl/gff.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
.. _gff:

===================================================
Read Genome Annotations (GFF3) as a Spark DataFrame
===================================================

.. invisible-code-block: python

from pyspark.sql import Row
from pyspark.sql.types import *

import glow
glow.register(spark)

`GFF3 (Generic Feature Format Version 3) <https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md>`_ is a 9-column tab-separated text file format commonly used to store genomic annotations.
Typically, the majority of annotation data in this format appears in the ninth column, called ``attributes``, as a semi-colon-separated list of ``<tag>=<value>`` entries. If Spark's standard ``csv`` data source is used to read GFF3 files, the whole list of attribute tag-value pairs will be read as a single string-typed column, making queries on these tags/values cumbersome.

To address this issue, Glow provides the ``gff`` data source. In addition to loading the first 8 columns of GFF3 as properly typed columns, the ``gff`` data source is able to parse all attribute tag-value pairs in the ninth column of GFF3 and create an appropriately typed column for each tag. In each row, the column corresponding to a tag will contain the tag's value in that row (or ``null`` if the tag does not appear in the row).

Like any Spark data source, reading GFF3 files using the ``gff`` data source can be done in a single line of code:

.. invisible-code-block: python

path = "test-data/gff/test_gff_with_fasta.gff"

.. code-block:: python

df = spark.read.format("gff").load(path)

.. invisible-code-block: python

assert_rows_equal(
df.head(),
Row(**{'seqId':'NC_000001.11', 'source':'RefSeq', 'type':'region', 'start':0, 'end':248956422, 'score':None, 'strand':'+', 'phase':1, 'ID':'NC_000001.11:1..248956422', 'Name':'1', 'Parent':None, 'Dbxref':['taxon:9606','test'], 'Is_circular':False, 'chromosome':'1', 'description':None, 'gbkey':'Src', 'gene':None, 'gene_biotype':None, 'gene_synonym':None, 'genome':'chromosome', 'mol_type':'genomic DNA', 'product':None, 'pseudo':None, 'test space':None, 'transcript_id':None})
)

The ``gff`` data source supports all compression formats supported by Spark's ``csv`` data source, including ``.gz`` and ``.bgz`` files. It also supports reading globs of files in one command.

.. note::
The ``gff`` data source ignores any comment and directive lines (lines starting with ``#``) in the GFF3 file as well as any FASTA lines that may appear at the end of the file.

Schema
======

1. Inferred schema
~~~~~~~~~~~~~~~~~~

If no user-specified schema is provided (as in the example above), the data source infers the schema as follows:

- The first 8 fields of the schema ("base" fields) correspond to the first 8 columns of the GFF3 file. Their names, types and order will be as shown below:

.. _base_fields:
.. code-block::

|-- seqId: string (nullable = true)
|-- source: string (nullable = true)
|-- type: string (nullable = true)
|-- start: long (nullable = true)
|-- end: long (nullable = true)
|-- score: double (nullable = true)
|-- strand: string (nullable = true)
|-- phase: integer (nullable = true)

.. note:: Although the ``start`` column in the GFF3 file is 1-based, the ``start`` field in the DataFrame will be 0-based to match the general practice in Glow.

- The next fields in the inferred schema will be created as the result of parsing the ``attributes`` column of the GFF3 file. Each tag will have its own field in the schema. Fields corresponding to any "official" tag (those referred to as `tags with pre-defined meaning <https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md>`_) come first, followed by fields corresponding to any other tag ("unofficial" tags).

The complete list of official fields, their data types, and order are as shown below:

.. code-block::

|-- ID: string (nullable = true)
|-- Name: string (nullable = true)
|-- Alias: string (nullable = true)
|-- Parent: array (nullable = true)
| |-- element: string (containsNull = true)
|-- Target: string (nullable = true)
|-- Gap: string (nullable = true)
|-- DerivesFrom: string (nullable = true)
|-- Note: array (nullable = true)
| |-- element: string (containsNull = true)
|-- Dbxref: array (nullable = true)
| |-- element: string (containsNull = true)
|-- OntologyTerm: array (nullable = true)
| |-- element: string (containsNull = true)
|-- Is_circular: boolean (nullable = true)


The unofficial fields will be of ``string`` type.

.. note::

- If any of official tags does not appear in any row of the GFF3 file, the corresponding field will be excluded from the inferred schema.
- The official/unofficial field name will be exactly as the corresponding tag appears in the GFF3 file (in terms of letter case).
- The parser is insensitive to the letter case of the tag, e.g., if the ``attributes`` column in the GFF3 file contains both ``note`` and ``Note`` tags, they will be both mapped to the same column in the DataFrame. The name of the column in this case will be either ``note`` or ``Note``, chosen randomly.

2. User-specified schema
~~~~~~~~~~~~~~~~~~~~~~~~

As with any Spark data source, the user can specify the schema while using the ``gff`` data source using the ``.schema`` command. The user-specified schema can have any subset of the base, official, and unofficial fields. The data source is able to read only the specified base fields and parse out only the specified official and unofficial fields from the ``attributes`` column of the GFF3 file. Here is an example of how the user can specify some base, official, and unofficial fields while reading the GFF3 file:

.. code-block:: python

mySchema = StructType(
[StructField('seqId', StringType()), # Base field
StructField('start', LongType()), # Base field
StructField('end', LongType()), # Base field
StructField('ID', StringType()), # Official field
StructField('Dbxref', ArrayType(StringType())), # Official field
StructField('mol_type', StringType())] # Unofficial field
)

df_user_specified = spark.read.format("gff").schema(mySchema).load(path)

.. invisible-code-block: python

assert_rows_equal(
df_user_specified.head(),
Row(**{'seqId':'NC_000001.11', 'start':0, 'end':248956422, 'ID':'NC_000001.11:1..248956422', 'Dbxref':['taxon:9606','test'], 'mol_type':'genomic DNA'})
)

.. note::

- The base field names in the user-specified schema must match the names in the :ref:`list above <base_fields>` in a case-sensitive manner.
- The official and unofficial fields will be matched with their corresponding tags in the GFF3 file in a case-and-underscore-insensitive manner. For example, if the GFF3 file contains the official tag ``db_xref``, a user-specified schema field with the name ``dbxref``, ``Db_Xref``, or any other case-and-underscore-insensitive match will correspond to that tag.
- The user can also include the original ``attributes`` column of the GFF3 file as a string field by including ``StructField('attributes', StringType())`` in the schema.


.. notebook:: .. etl/gff.html
1 change: 1 addition & 0 deletions docs/source/etl/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enabling seamless manipulation, filtering, quality control and transformation be
:maxdepth: 2

variant-data
gff
vcf2delta
variant-qc
sample-qc
Expand Down
15 changes: 8 additions & 7 deletions python/glow/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

def _is_numpy_double_array(object, dimensions: int) -> bool:
assert check_argument_types()
output = isinstance(object, np.ndarray) and len(object.shape) == dimensions and object.dtype.type == np.double
output = isinstance(object, np.ndarray) and len(
object.shape) == dimensions and object.dtype.type == np.double
assert check_return_type(output)
return output

Expand Down Expand Up @@ -44,9 +45,8 @@ class OneDimensionalDoubleNumpyArrayConverter(object):
>>> df.withColumn("array", lit(ndarray)).collect()
[Row(value='a', array=[1.0, 2.1, 3.2]), Row(value='b', array=[1.0, 2.1, 3.2])]
"""

def can_convert(self, object):
return _is_numpy_double_array(object, dimensions = 1)
return _is_numpy_double_array(object, dimensions=1)

def convert(self, object, gateway_client):
sc = SparkContext._active_spark_context
Expand All @@ -70,17 +70,18 @@ class TwoDimensionalDoubleNumpyArrayConverter(object):
>>> df.withColumn("matrix", lit(ndarray)).collect()
[Row(value='a', matrix=DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5], False)), Row(value='b', matrix=DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5], False))]
"""

def can_convert(self, object):
return _is_numpy_double_array(object, dimensions = 2)
return _is_numpy_double_array(object, dimensions=2)

def convert(self, object, gateway_client):
sc = SparkContext._active_spark_context
flat_arr = object.ravel()
java_arr = _convert_numpy_to_java_array(flat_arr)
dense_matrix = sc._jvm.org.apache.spark.ml.linalg.DenseMatrix(object.shape[0], object.shape[1], java_arr)
dense_matrix = sc._jvm.org.apache.spark.ml.linalg.DenseMatrix(object.shape[0],
object.shape[1], java_arr)
matrix_udt = sc._jvm.org.apache.spark.ml.linalg.MatrixUDT()
converter = sc._jvm.org.apache.spark.sql.catalyst.CatalystTypeConverters.createToCatalystConverter(matrix_udt)
converter = sc._jvm.org.apache.spark.sql.catalyst.CatalystTypeConverters.createToCatalystConverter(
matrix_udt)
literal_matrix = sc._jvm.org.apache.spark.sql.catalyst.expressions.Literal.create(
converter.apply(dense_matrix), matrix_udt)
return literal_matrix
7 changes: 5 additions & 2 deletions python/glow/glow.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ def register(session: SparkSession):
assert check_argument_types()
session._jvm.io.projectglow.Glow.register(session._jsparkSession)


# Register input converters in idempotent fashion
glow_input_converters = [OneDimensionalDoubleNumpyArrayConverter, TwoDimensionalDoubleNumpyArrayConverter]
glow_input_converters = [
OneDimensionalDoubleNumpyArrayConverter, TwoDimensionalDoubleNumpyArrayConverter
]
for gic in glow_input_converters:
if not any(type(pic) is gic for pic in protocol.INPUT_CONVERTER):
register_input_converter(gic(), prepend = True)
register_input_converter(gic(), prepend=True)
12 changes: 6 additions & 6 deletions python/glow/tests/test_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def test_convert_matrix(spark):
ndarray = np.array([[1.0, 2.1, 3.2], [4.3, 5.4, 6.5]])
output_rows = df.withColumn("matrix", lit(ndarray)).collect()
expected_matrix = DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5])
assert(output_rows[0].matrix == expected_matrix)
assert(output_rows[1].matrix == expected_matrix)
assert (output_rows[0].matrix == expected_matrix)
assert (output_rows[1].matrix == expected_matrix)


def test_convert_array(spark):
Expand All @@ -24,8 +24,8 @@ def test_convert_array(spark):
ndarray = np.array([1.0, 2.1, 3.2])
output_rows = df.withColumn("array", lit(ndarray)).collect()
expected_array = [1.0, 2.1, 3.2]
assert(output_rows[0].array == expected_array)
assert(output_rows[1].array == expected_array)
assert (output_rows[0].array == expected_array)
assert (output_rows[1].array == expected_array)


def test_convert_checks_dimension(spark):
Expand Down Expand Up @@ -58,5 +58,5 @@ def test_register_converters_idempotent(spark):
one_d_converters += 1
if type(c) is TwoDimensionalDoubleNumpyArrayConverter:
two_d_converters += 1
assert(one_d_converters == 1)
assert(two_d_converters == 1)
assert (one_d_converters == 1)
assert (two_d_converters == 1)