Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge: Support sequences without cross-checking #1631

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
- name: Install dependencies from Conda
uses: mamba-org/setup-micromamba@v2
with:
create-args: mafft raxml fasttree iqtree vcftools sqlite tsv-utils biopython=${{ matrix.biopython-version }} python=${{ matrix.python-version }}
create-args: mafft raxml fasttree iqtree vcftools seqkit sqlite tsv-utils biopython=${{ matrix.biopython-version }} python=${{ matrix.python-version }}
condarc: |
channels:
- conda-forge
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## __NEXT__

### Features

* merge: Support merging of sequence files with `--sequences`. [#1579][] (@victorlin)

[#1579]: https://github.com/nextstrain/augur/issues/1579

## 28.0.0 (30 January 2025)

Expand Down
225 changes: 204 additions & 21 deletions augur/merge.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""
Merge two or more metadata tables into one.
Merge two or more datasets into one.

Tables must be given unique names to identify them in the output and are
merged in the order given.
Datasets can consist of metadata and/or sequence files. If both are provided,
the order and file contents are used independently.

**Metadata**

Metadata tables must be given unique names to identify them in the output and
are merged in the order given.

Rows are joined by id (e.g. "strain" or "name" or other
--metadata-id-columns), and ids must be unique within an input table (i.e.
Expand All @@ -29,11 +34,21 @@
transient disk space required is approximately the sum of the uncompressed size
of the inputs.

SQLite is used behind the scenes to implement the merge, but, at least for now,
this should be considered an implementation detail that may change in the
future. The SQLite 3 CLI, sqlite3, must be available. If it's not on PATH (or
you want to use a version different from what's on PATH), set the SQLITE3
environment variable to path of the desired sqlite3 executable.
SQLite is used behind the scenes to implement the merge, but this should be
considered an implementation detail that may change in the future. The SQLite 3
CLI, sqlite3, must be available. If it's not on PATH (or you want to use a
version different from what's on PATH), set the SQLITE3 environment variable to
path of the desired sqlite3 executable.

**Sequences**

Sequence files are merged in the order given.

SeqKit is used behind the scenes to implement the merge, but this should be
considered an implementation detail that may change in the future. The CLI
program seqkit must be available. If it's not on PATH (or you want to use a
version different from what's on PATH), set the SEQKIT environment variable to
path of the desired seqkit executable.
"""
import os
import re
Expand All @@ -43,7 +58,7 @@
from itertools import starmap
from shlex import quote as shquote
from shutil import which
from tempfile import mkstemp
from tempfile import mkstemp, NamedTemporaryFile
from textwrap import dedent
from typing import Iterable, Tuple, TypeVar

Expand All @@ -57,6 +72,18 @@
T = TypeVar('T')


print_info = print_err


# Locate how to re-invoke ourselves (_this_ specific Augur).
if sys.executable:
augur = f"{shquote(sys.executable)} -m augur"
else:
# A bit unusual we don't know our own Python executable, but assume we
# can access ourselves as the ``augur`` command.
augur = f"augur"


class NamedMetadata(Metadata):
name: str
"""User-provided descriptive name for this metadata file."""
Expand All @@ -77,23 +104,55 @@ def register_parser(parent_subparsers):
parser = parent_subparsers.add_parser("merge", help=first_line(__doc__))

input_group = parser.add_argument_group("inputs", "options related to input")
input_group.add_argument("--metadata", nargs="+", action=ExtendOverwriteDefault, required=True, metavar="NAME=FILE", help="Required. Metadata table names and file paths. Names are arbitrary monikers used solely for referring to the associated input file in other arguments and in output column names. Paths must be to seekable files, not unseekable streams. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
input_group.add_argument("--metadata", nargs="+", action=ExtendOverwriteDefault, metavar="NAME=FILE", help="Metadata table names and file paths. Names are arbitrary monikers used solely for referring to the associated input file in other arguments and in output column names. Paths must be to seekable files, not unseekable streams. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)

input_group.add_argument("--metadata-id-columns", default=DEFAULT_ID_COLUMNS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]COLUMN", help=f"Possible metadata column names containing identifiers, considered in the order given. Columns will be considered for all metadata tables by default. Table-specific column names may be given using the same names assigned in --metadata. Only one ID column will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_ID_COLUMNS))})" + SKIP_AUTO_DEFAULT_IN_HELP)
input_group.add_argument("--metadata-delimiters", default=DEFAULT_DELIMITERS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]CHARACTER", help=f"Possible field delimiters to use for reading metadata tables, considered in the order given. Delimiters will be considered for all metadata tables by default. Table-specific delimiters may be given using the same names assigned in --metadata. Only one delimiter will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_DELIMITERS))})" + SKIP_AUTO_DEFAULT_IN_HELP)

input_group.add_argument("--sequences", nargs="+", action=ExtendOverwriteDefault, metavar="FILE", help="Sequence files. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
input_group.add_argument("--skip-input-sequences-validation", action="store_true", default=False, help="Skip validation of --sequences (checking for no duplicates) to improve run time. Note that this may result in unexpected behavior in cases where validation would fail.")

output_group = parser.add_argument_group("outputs", "options related to output")
output_group.add_argument('--output-metadata', required=True, metavar="FILE", help="Required. Merged metadata as TSV. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--output-metadata', metavar="FILE", help="Merged metadata as TSV. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--source-columns', metavar="TEMPLATE", help=f"Template with which to generate names for the columns (described above) identifying the source of each row's data. Must contain a literal placeholder, {{NAME}}, which stands in for the metadata table names assigned in --metadata. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP)
output_group.add_argument('--no-source-columns', dest="source_columns", action="store_const", const=None, help=f"Suppress generated columns (described above) identifying the source of each row's data. This is the default behaviour, but it may be made explicit or used to override a previous --source-columns." + SKIP_AUTO_DEFAULT_IN_HELP)

output_group.add_argument('--output-sequences', metavar="FILE", help="Merged sequences as FASTA. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP)

output_group.add_argument('--quiet', action="store_true", default=False, help="Suppress informational and warning messages normally written to stderr. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP)

return parser


def validate_arguments(args):
if not any((args.metadata, args.sequences)):
raise AugurError("Either --metadata or --sequences must be specified.")
if not any((args.output_metadata, args.output_sequences)):
raise AugurError("Either --output-metadata or --sequences must be specified.")

if args.output_metadata and not args.metadata:
raise AugurError("--output-metadata requires --metadata.")
if args.output_sequences and not args.sequences:
raise AugurError("--output-sequences requires --sequences.")


def run(args):
print_info = print_err if not args.quiet else lambda *_: None
global print_info

if args.quiet:
print_info = lambda *_: None

# Catch user errors early to avoid unnecessary computation.
validate_arguments(args)

if args.metadata:
merge_metadata(args)

if args.sequences:
merge_sequences(args)


def merge_metadata(args):
# Parse --metadata arguments
if not len(args.metadata) >= 2:
raise AugurError(f"At least two metadata inputs are required for merging.")
Expand Down Expand Up @@ -169,15 +228,6 @@ def run(args):
for name, path in metadata]


# Locate how to re-invoke ourselves (_this_ specific Augur).
if sys.executable:
augur = f"{shquote(sys.executable)} -m augur"
else:
# A bit unusual we don't know our own Python executable, but assume we
# can access ourselves as the ``augur`` command.
augur = f"augur"


# Work with a temporary, on-disk SQLite database under a name we control so
# we can access it from multiple (serial) processes.
db_fd, db_path = mkstemp(prefix="augur-merge-", suffix=".sqlite")
Expand Down Expand Up @@ -329,6 +379,57 @@ def run(args):
print_info(f"WARNING: Skipped deletion of {db_path} due to error, but you may want to clean it up yourself (e.g. if it's large).")


def merge_sequences(args):
if not args.skip_input_sequences_validation:
for s in args.sequences:
print_info(f"Validating {s!r}…")

with NamedTemporaryFile() as dup_num_file:
command = f"""
{augur} read-file {shquote(s)} |
{seqkit()} rmdup --dup-num-file {shquote(dup_num_file.name)}
"""

returncode = run_command(command, print_stdout=False)

if returncode != 0:
raise AugurError(f"Validation failed for {s!r}.")

dup_num_file.seek(0)

if duplicates := list(parse_seqkit_dup_num_file(dup_num_file.read().decode().strip())):
raise AugurError(f"File {s} has duplicate IDs: {', '.join(repr(d) for d in duplicates)}")

print_info(f"Merging sequences and writing to {args.output_sequences!r}…")

# Reversed because seqkit rmdup keeps the first entry but this command
# should keep the last entry.
# "echo;" adds a newline character to support FASTA files that are missing one at the end.
# This is fine because extraneous newline characters are stripped by seqkit.
command = f"""
(for f in {" ".join(shquote(s) for s in reversed(args.sequences))}; do
{augur} read-file "$f";
echo;
done) |
{seqkit()} rmdup |
{augur} write-file {shquote(args.output_sequences)}
"""

returncode = run_command(command, print_stdout=True)

if returncode != 0:
# Ideally, read-file, seqkit, and write-file errors would all be handled
# separately. That is not possible because everything is done in one
# child process with one return code.
# FIXME: also clean up metadata merge files, and vice versa?
if args.output_sequences == "-":
raise AugurError(f"Merging failed, see error(s) above. The command may have already written data to stdout. You may want to clean up any partial outputs.")
else:
# Remove the partial output file.
os.remove(args.output_sequences)
raise AugurError(f"Merging failed, see error(s) above.")


def sqlite3(*args, **kwargs):
"""
Internal helper for invoking ``sqlite3``, the SQLite CLI program.
Expand Down Expand Up @@ -478,3 +579,85 @@ def quote(s):
# …and instead quote a final empty string down here if we're still empty
# after joining all our parts together.
return quoted if quoted else shquote('')


def run_command(command: str, print_stdout: bool):
"""Run a command, clearly marking any messages that resemble SeqKit output."""
process = subprocess.Popen(
['bash', '-euo', 'pipefail', '-c', command],
stdout=subprocess.PIPE if print_stdout else subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True,
)

if print_stdout:
for line in iter(process.stdout.readline, ''):
print(line.rstrip())

for line in iter(process.stderr.readline, ''):
# Remove ANSI escape sequences common in SeqKit output.
line = re.sub(r'\x1b\[[\d;]+m', '', line)

# Detect messages from seqkit and append a prefix.
if line.startswith("[INFO]"):
print_debug(f"[SeqKit] {line.rstrip()}")
elif line.startswith("[ERRO]"):
print_info(f"[SeqKit] {line.rstrip()}")
else:
print_info(line.rstrip())

# Wait for the seqkit process to finish.
# FIXME: address potential deadlock issue?
# <https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait>
# I think this is safe since process.stdout and process.stderr are iterated manually.
return process.wait()


def seqkit():
"""
Internal helper for invoking ``seqkit``, the SeqKit CLI program.

Unlike ``sqlite3()``, this function is not a wrapper around subprocess.run.
It is meant to be called without any parameters and only returns the
location of the executable. This is due to differences in the way the two
programs are invoked.
"""
seqkit = shquote(os.environ.get("SEQKIT", which("seqkit")))
if not seqkit:
raise AugurError(dedent(f"""\
Unable to find the program `seqkit`. Is it installed?
In order to use `augur merge`, the SeqKit CLI must be installed
separately. It is typically provided by a Nextstrain runtime.
"""))
return seqkit


def parse_seqkit_dup_num_file(contents: str):
"""Extract duplicated IDs from the output of seqkit rmdup --dup-num-file.

There is one line per duplicated ID, in the format of a number n followed by
tab character followed by the ID duplicated n times, separated by ", ".

Example FASTA headers

> id, 1
> id, 2
> id, 3
> id1
> id1

will produce a --dup-num-file output file

3\tid,, id,, id,
2\tid1, id1

which parsed by this function will yield

id,
id1

"""
if contents == "":
return
for line in contents.split('\n'):
yield line[line.rfind(", ")+2:]
8 changes: 4 additions & 4 deletions docs/installation/non-python-dependencies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Augur uses some external bioinformatics programs that are not available on PyPI:
- `RAxML <https://cme.h-its.org/exelixis/web/software/raxml/>`__ (optional alternative)
- `FastTree <http://www.microbesonline.org/fasttree/>`__ (optional alternative)

- ``augur merge`` requires ``sqlite3``, the `SQLite <https://sqlite.org>`__ CLI (version ≥3.39).
- ``augur merge`` requires ``sqlite3`` (the `SQLite <https://sqlite.org>`__ CLI (version ≥3.39)) for metadata and ``seqkit`` for sequences.

- Bacterial data (or any VCF usage) requires `vcftools <https://vcftools.github.io/>`__

Expand All @@ -20,19 +20,19 @@ If you use Conda, you can install them in an active environment:

.. code:: bash

conda install -c conda-forge -c bioconda mafft raxml fasttree iqtree vcftools sqlite --yes
conda install -c conda-forge -c bioconda mafft raxml fasttree iqtree vcftools seqkit sqlite --yes

On macOS using `Homebrew <https://brew.sh/>`__:

.. code:: bash

brew tap brewsci/bio
brew install mafft iqtree raxml fasttree vcftools sqlite
brew install mafft iqtree raxml fasttree vcftools seqkit sqlite

On Debian/Ubuntu:

.. code:: bash

sudo apt install mafft iqtree raxml fasttree vcftools sqlite3
sudo apt install mafft iqtree raxml fasttree vcftools seqkit sqlite3

Other Linux distributions will likely have the same packages available, although the names may differ slightly.
Loading
Loading