diff --git a/core/src/main/scala/io/projectglow/gff/GffDataSource.scala b/core/src/main/scala/io/projectglow/gff/GffDataSource.scala index 340391b2c..f78685a8e 100644 --- a/core/src/main/scala/io/projectglow/gff/GffDataSource.scala +++ b/core/src/main/scala/io/projectglow/gff/GffDataSource.scala @@ -227,7 +227,7 @@ object GffDataSource { } /** - * Infers the schema by reading the gff fileusing csv datasource and parsing the attributes fields + * Infers the schema by reading the gff file using csv data source and parsing the attributes fields * to get official and unofficial attribute columns. Drops the original "attributes" column. Names, * types and ordering of columns will be as follows: * Names: All attribute fields will have names exactly as in tags. diff --git a/docs/source/_static/notebooks/etl/gff.html b/docs/source/_static/notebooks/etl/gff.html new file mode 100644 index 000000000..988cc343b --- /dev/null +++ b/docs/source/_static/notebooks/etl/gff.html @@ -0,0 +1,42 @@ + + + + +gff - Databricks + + + + + + + + + + + + + + + + + + + diff --git a/docs/source/etl/gff.rst b/docs/source/etl/gff.rst new file mode 100644 index 000000000..50be232a0 --- /dev/null +++ b/docs/source/etl/gff.rst @@ -0,0 +1,129 @@ +.. _gff: + +=================================================== +Read Genome Annotations (GFF3) as a Spark DataFrame +=================================================== + +.. invisible-code-block: python + + from pyspark.sql import Row + from pyspark.sql.types import * + + import glow + glow.register(spark) + +`GFF3 (Generic Feature Format Version 3) `_ is a 9-column tab-separated text file format commonly used to store genomic annotations. +Typically, the majority of annotation data in this format appears in the ninth column, called ``attributes``, as a semi-colon-separated list of ``=`` entries. If Spark's standard ``csv`` data source is used to read GFF3 files, the whole list of attribute tag-value pairs will be read as a single string-typed column, making queries on these tags/values cumbersome. + +To address this issue, Glow provides the ``gff`` data source. In addition to loading the first 8 columns of GFF3 as properly typed columns, the ``gff`` data source is able to parse all attribute tag-value pairs in the ninth column of GFF3 and create an appropriately typed column for each tag. In each row, the column corresponding to a tag will contain the tag's value in that row (or ``null`` if the tag does not appear in the row). + +Like any Spark data source, reading GFF3 files using the ``gff`` data source can be done in a single line of code: + +.. invisible-code-block: python + + path = "test-data/gff/test_gff_with_fasta.gff" + +.. code-block:: python + + df = spark.read.format("gff").load(path) + +.. invisible-code-block: python + + assert_rows_equal( + df.head(), + Row(**{'seqId':'NC_000001.11', 'source':'RefSeq', 'type':'region', 'start':0, 'end':248956422, 'score':None, 'strand':'+', 'phase':1, 'ID':'NC_000001.11:1..248956422', 'Name':'1', 'Parent':None, 'Dbxref':['taxon:9606','test'], 'Is_circular':False, 'chromosome':'1', 'description':None, 'gbkey':'Src', 'gene':None, 'gene_biotype':None, 'gene_synonym':None, 'genome':'chromosome', 'mol_type':'genomic DNA', 'product':None, 'pseudo':None, 'test space':None, 'transcript_id':None}) + ) + +The ``gff`` data source supports all compression formats supported by Spark's ``csv`` data source, including ``.gz`` and ``.bgz`` files. It also supports reading globs of files in one command. + +.. note:: + The ``gff`` data source ignores any comment and directive lines (lines starting with ``#``) in the GFF3 file as well as any FASTA lines that may appear at the end of the file. + +Schema +====== + +1. Inferred schema +~~~~~~~~~~~~~~~~~~ + +If no user-specified schema is provided (as in the example above), the data source infers the schema as follows: + +- The first 8 fields of the schema ("base" fields) correspond to the first 8 columns of the GFF3 file. Their names, types and order will be as shown below: + + .. _base_fields: + .. code-block:: + + |-- seqId: string (nullable = true) + |-- source: string (nullable = true) + |-- type: string (nullable = true) + |-- start: long (nullable = true) + |-- end: long (nullable = true) + |-- score: double (nullable = true) + |-- strand: string (nullable = true) + |-- phase: integer (nullable = true) + + .. note:: Although the ``start`` column in the GFF3 file is 1-based, the ``start`` field in the DataFrame will be 0-based to match the general practice in Glow. + +- The next fields in the inferred schema will be created as the result of parsing the ``attributes`` column of the GFF3 file. Each tag will have its own field in the schema. Fields corresponding to any "official" tag (those referred to as `tags with pre-defined meaning `_) come first, followed by fields corresponding to any other tag ("unofficial" tags). + + The complete list of official fields, their data types, and order are as shown below: + + .. code-block:: + + |-- ID: string (nullable = true) + |-- Name: string (nullable = true) + |-- Alias: string (nullable = true) + |-- Parent: array (nullable = true) + | |-- element: string (containsNull = true) + |-- Target: string (nullable = true) + |-- Gap: string (nullable = true) + |-- DerivesFrom: string (nullable = true) + |-- Note: array (nullable = true) + | |-- element: string (containsNull = true) + |-- Dbxref: array (nullable = true) + | |-- element: string (containsNull = true) + |-- OntologyTerm: array (nullable = true) + | |-- element: string (containsNull = true) + |-- Is_circular: boolean (nullable = true) + + + The unofficial fields will be of ``string`` type. + + .. note:: + + - If any of official tags does not appear in any row of the GFF3 file, the corresponding field will be excluded from the inferred schema. + - The official/unofficial field name will be exactly as the corresponding tag appears in the GFF3 file (in terms of letter case). + - The parser is insensitive to the letter case of the tag, e.g., if the ``attributes`` column in the GFF3 file contains both ``note`` and ``Note`` tags, they will be both mapped to the same column in the DataFrame. The name of the column in this case will be either ``note`` or ``Note``, chosen randomly. + +2. User-specified schema +~~~~~~~~~~~~~~~~~~~~~~~~ + +As with any Spark data source, the user can specify the schema while using the ``gff`` data source using the ``.schema`` command. The user-specified schema can have any subset of the base, official, and unofficial fields. The data source is able to read only the specified base fields and parse out only the specified official and unofficial fields from the ``attributes`` column of the GFF3 file. Here is an example of how the user can specify some base, official, and unofficial fields while reading the GFF3 file: + +.. code-block:: python + + mySchema = StructType( + [StructField('seqId', StringType()), # Base field + StructField('start', LongType()), # Base field + StructField('end', LongType()), # Base field + StructField('ID', StringType()), # Official field + StructField('Dbxref', ArrayType(StringType())), # Official field + StructField('mol_type', StringType())] # Unofficial field + ) + + df_user_specified = spark.read.format("gff").schema(mySchema).load(path) + +.. invisible-code-block: python + + assert_rows_equal( + df_user_specified.head(), + Row(**{'seqId':'NC_000001.11', 'start':0, 'end':248956422, 'ID':'NC_000001.11:1..248956422', 'Dbxref':['taxon:9606','test'], 'mol_type':'genomic DNA'}) + ) + +.. note:: + + - The base field names in the user-specified schema must match the names in the :ref:`list above ` in a case-sensitive manner. + - The official and unofficial fields will be matched with their corresponding tags in the GFF3 file in a case-and-underscore-insensitive manner. For example, if the GFF3 file contains the official tag ``db_xref``, a user-specified schema field with the name ``dbxref``, ``Db_Xref``, or any other case-and-underscore-insensitive match will correspond to that tag. + - The user can also include the original ``attributes`` column of the GFF3 file as a string field by including ``StructField('attributes', StringType())`` in the schema. + + +.. notebook:: .. etl/gff.html diff --git a/docs/source/etl/index.rst b/docs/source/etl/index.rst index 81ef6be76..28eca68f0 100644 --- a/docs/source/etl/index.rst +++ b/docs/source/etl/index.rst @@ -12,6 +12,7 @@ enabling seamless manipulation, filtering, quality control and transformation be :maxdepth: 2 variant-data + gff vcf2delta variant-qc sample-qc diff --git a/python/glow/conversions.py b/python/glow/conversions.py index cd3c20484..077d45c61 100644 --- a/python/glow/conversions.py +++ b/python/glow/conversions.py @@ -6,7 +6,8 @@ def _is_numpy_double_array(object, dimensions: int) -> bool: assert check_argument_types() - output = isinstance(object, np.ndarray) and len(object.shape) == dimensions and object.dtype.type == np.double + output = isinstance(object, np.ndarray) and len( + object.shape) == dimensions and object.dtype.type == np.double assert check_return_type(output) return output @@ -44,9 +45,8 @@ class OneDimensionalDoubleNumpyArrayConverter(object): >>> df.withColumn("array", lit(ndarray)).collect() [Row(value='a', array=[1.0, 2.1, 3.2]), Row(value='b', array=[1.0, 2.1, 3.2])] """ - def can_convert(self, object): - return _is_numpy_double_array(object, dimensions = 1) + return _is_numpy_double_array(object, dimensions=1) def convert(self, object, gateway_client): sc = SparkContext._active_spark_context @@ -70,17 +70,18 @@ class TwoDimensionalDoubleNumpyArrayConverter(object): >>> df.withColumn("matrix", lit(ndarray)).collect() [Row(value='a', matrix=DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5], False)), Row(value='b', matrix=DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5], False))] """ - def can_convert(self, object): - return _is_numpy_double_array(object, dimensions = 2) + return _is_numpy_double_array(object, dimensions=2) def convert(self, object, gateway_client): sc = SparkContext._active_spark_context flat_arr = object.ravel() java_arr = _convert_numpy_to_java_array(flat_arr) - dense_matrix = sc._jvm.org.apache.spark.ml.linalg.DenseMatrix(object.shape[0], object.shape[1], java_arr) + dense_matrix = sc._jvm.org.apache.spark.ml.linalg.DenseMatrix(object.shape[0], + object.shape[1], java_arr) matrix_udt = sc._jvm.org.apache.spark.ml.linalg.MatrixUDT() - converter = sc._jvm.org.apache.spark.sql.catalyst.CatalystTypeConverters.createToCatalystConverter(matrix_udt) + converter = sc._jvm.org.apache.spark.sql.catalyst.CatalystTypeConverters.createToCatalystConverter( + matrix_udt) literal_matrix = sc._jvm.org.apache.spark.sql.catalyst.expressions.Literal.create( converter.apply(dense_matrix), matrix_udt) return literal_matrix diff --git a/python/glow/glow.py b/python/glow/glow.py index c3274d597..a6c6baa91 100644 --- a/python/glow/glow.py +++ b/python/glow/glow.py @@ -58,8 +58,11 @@ def register(session: SparkSession): assert check_argument_types() session._jvm.io.projectglow.Glow.register(session._jsparkSession) + # Register input converters in idempotent fashion -glow_input_converters = [OneDimensionalDoubleNumpyArrayConverter, TwoDimensionalDoubleNumpyArrayConverter] +glow_input_converters = [ + OneDimensionalDoubleNumpyArrayConverter, TwoDimensionalDoubleNumpyArrayConverter +] for gic in glow_input_converters: if not any(type(pic) is gic for pic in protocol.INPUT_CONVERTER): - register_input_converter(gic(), prepend = True) + register_input_converter(gic(), prepend=True) diff --git a/python/glow/tests/test_conversions.py b/python/glow/tests/test_conversions.py index a3eaa6ebe..3a8457fbf 100644 --- a/python/glow/tests/test_conversions.py +++ b/python/glow/tests/test_conversions.py @@ -14,8 +14,8 @@ def test_convert_matrix(spark): ndarray = np.array([[1.0, 2.1, 3.2], [4.3, 5.4, 6.5]]) output_rows = df.withColumn("matrix", lit(ndarray)).collect() expected_matrix = DenseMatrix(2, 3, [1.0, 2.1, 3.2, 4.3, 5.4, 6.5]) - assert(output_rows[0].matrix == expected_matrix) - assert(output_rows[1].matrix == expected_matrix) + assert (output_rows[0].matrix == expected_matrix) + assert (output_rows[1].matrix == expected_matrix) def test_convert_array(spark): @@ -24,8 +24,8 @@ def test_convert_array(spark): ndarray = np.array([1.0, 2.1, 3.2]) output_rows = df.withColumn("array", lit(ndarray)).collect() expected_array = [1.0, 2.1, 3.2] - assert(output_rows[0].array == expected_array) - assert(output_rows[1].array == expected_array) + assert (output_rows[0].array == expected_array) + assert (output_rows[1].array == expected_array) def test_convert_checks_dimension(spark): @@ -58,5 +58,5 @@ def test_register_converters_idempotent(spark): one_d_converters += 1 if type(c) is TwoDimensionalDoubleNumpyArrayConverter: two_d_converters += 1 - assert(one_d_converters == 1) - assert(two_d_converters == 1) + assert (one_d_converters == 1) + assert (two_d_converters == 1)