-
Notifications
You must be signed in to change notification settings - Fork 128
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #972: utils: Restore read_metadata and supporting …
…file
- Loading branch information
Showing
4 changed files
with
248 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import functools | ||
import pandas | ||
import sys | ||
|
||
|
||
class MetadataFile: | ||
""" | ||
Represents a CSV or TSV file containing metadata | ||
The file must contain exactly one of a column named `strain` or `name`, | ||
which is used to match metadata with samples. | ||
""" | ||
|
||
def __init__(self, fname, query=None, as_data_frame=False): | ||
self.fname = fname | ||
self.query = query | ||
self.as_data_frame = as_data_frame | ||
|
||
self.key_type = self.find_key_type() | ||
|
||
def read(self): | ||
self.check_metadata_duplicates() | ||
|
||
# augur assumes the metadata dict will contain either "strain" or "name" (the | ||
# indexed column), but DataFrame.to_dict("index") does not place the indexed | ||
# column in the dict. So let's make a copy of the indexed column so that the | ||
# original "strain"/"name" remains in the output. | ||
self.metadata["_index"] = self.metadata[self.key_type] | ||
|
||
metadata = self.metadata.set_index("_index") | ||
|
||
if self.as_data_frame: | ||
return metadata, self.columns | ||
else: | ||
return metadata.to_dict("index"), self.columns | ||
|
||
@property | ||
@functools.lru_cache() | ||
def metadata(self): | ||
""" | ||
Return list of dicts representing the metadata in the file. | ||
If a query was supplied, apply it. | ||
""" | ||
|
||
metadata = self.parse_file() | ||
|
||
if self.query: | ||
try: | ||
metadata = metadata.query(self.query).copy() | ||
except Exception as e: | ||
raise ValueError( | ||
f"Error applying pandas query to metadata: `{self.query}` ({e})" | ||
) | ||
|
||
return metadata | ||
|
||
def check_metadata_duplicates(self): | ||
duplicate_rows = self.metadata[self.key_type].duplicated() | ||
if any(duplicate_rows): | ||
duplicates = self.metadata.loc[duplicate_rows, self.key_type].values | ||
raise ValueError( | ||
f"Duplicated {self.key_type} in metadata: {', '.join(duplicates)}" | ||
) | ||
|
||
@property | ||
@functools.lru_cache() | ||
def columns(self): | ||
return list(self.parse_file().columns) | ||
|
||
def find_key_type(self): | ||
if "strain" not in self.columns and "name" not in self.columns: | ||
raise ValueError( | ||
f"Metadata file {self.fname} does not contain `name` or `strain`" | ||
) | ||
|
||
if "strain" in self.columns and "name" in self.columns: | ||
print( | ||
f"WARNING: Metadata file {self.fname} contains both `name` and `strain`. Using `strain`.", | ||
file=sys.stderr, | ||
) | ||
|
||
if "strain" in self.columns: | ||
return "strain" | ||
|
||
return "name" | ||
|
||
@functools.lru_cache() | ||
def parse_file(self): | ||
return pandas.read_csv( | ||
self.fname, | ||
sep=None, # csv.Sniffer will automatically detect sep | ||
engine="python", | ||
skipinitialspace=True, | ||
dtype={"strain":"string", "name":"string"} | ||
).fillna("") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import re | ||
|
||
from augur.util_support.metadata_file import MetadataFile | ||
|
||
import pytest | ||
|
||
|
||
@pytest.fixture() | ||
def prepare_file(tmpdir): | ||
def _prepare_file(contents): | ||
with open(f"{tmpdir}/metadata.txt", "w") as file: | ||
file.write(re.sub(r"^\s*", "", contents)) | ||
|
||
return _prepare_file | ||
|
||
|
||
class TestMetadataFile: | ||
@pytest.mark.parametrize("fname", ["", "/does/not/exist.txt"]) | ||
def test_read_metadata_file_not_found(self, fname): | ||
with pytest.raises(FileNotFoundError): | ||
MetadataFile(fname, None).read() | ||
|
||
@pytest.mark.parametrize( | ||
"query, expected_strains", | ||
[ | ||
("location=='colorado'", ["strainA", "strainB"]), | ||
("quality=='good' & location=='colorado'", ["strainA"]), | ||
(None, ["strainA", "strainB", "strainC"]), | ||
], | ||
) | ||
def test_read_metadata_query(self, tmpdir, prepare_file, query, expected_strains): | ||
prepare_file( | ||
""" | ||
strain,location,quality | ||
strainA,colorado,good | ||
strainB,colorado,bad | ||
strainC,nevada,good | ||
""" | ||
) | ||
|
||
records, columns = MetadataFile(f"{tmpdir}/metadata.txt", query).read() | ||
assert list(records.keys()) == expected_strains | ||
assert list(columns) == ["strain", "location", "quality"] | ||
|
||
def test_read_metadata_bad_query(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
strain,location,quality | ||
strainA,colorado,good | ||
strainB,colorado,bad | ||
strainC,nevada,good | ||
""" | ||
) | ||
|
||
with pytest.raises(ValueError, match="Error applying pandas query"): | ||
MetadataFile(f"{tmpdir}/metadata.txt", "age=5").read() | ||
|
||
def test_read_metadata_duplicate_strain(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
strain,quality | ||
strainA,good | ||
strainA,good | ||
""" | ||
) | ||
|
||
with pytest.raises(ValueError, match="Duplicated strain in metadata: strainA"): | ||
MetadataFile(f"{tmpdir}/metadata.txt", None).read() | ||
|
||
def test_read_metadata_duplicate_name(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
name,quality | ||
nameA,good | ||
nameA,good | ||
""" | ||
) | ||
|
||
with pytest.raises(ValueError, match="Duplicated name in metadata: nameA"): | ||
MetadataFile(f"{tmpdir}/metadata.txt", None).read() | ||
|
||
def test_read_metadata_strain_and_name(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
strain,name,quality | ||
strainA,nameA,good | ||
strainB,nameB,good | ||
""" | ||
) | ||
|
||
assert MetadataFile(f"{tmpdir}/metadata.txt", None).find_key_type() == "strain" | ||
|
||
def test_read_metadata_no_strain_or_name(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
location,quality | ||
colorado,good | ||
nevada,good | ||
""" | ||
) | ||
|
||
with pytest.raises(ValueError, match="does not contain"): | ||
MetadataFile(f"{tmpdir}/metadata.txt", None).read() | ||
|
||
def test_metadata_delimiter_autodetect(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
strain\tlocation\tquality | ||
strainA\tcolorado\tgood | ||
strainB\tnevada\tgood | ||
""" | ||
) | ||
|
||
records, columns = MetadataFile(f"{tmpdir}/metadata.txt").read() | ||
assert records == { | ||
"strainA": {"strain": "strainA", "location": "colorado", "quality": "good"}, | ||
"strainB": {"strain": "strainB", "location": "nevada", "quality": "good"}, | ||
} | ||
assert list(columns) == ["strain", "location", "quality"] | ||
|
||
def test_metadata_strain_type(self, tmpdir, prepare_file): | ||
prepare_file( | ||
""" | ||
strain\tlocation | ||
1\tWashington | ||
2\tOregon | ||
""" | ||
) | ||
|
||
records, columns = MetadataFile(f"{tmpdir}/metadata.txt").read() | ||
assert "1" in records |