diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index d0ce300a..354c3db2 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -56,6 +56,113 @@ def clinical(): pass +# Parse sequencing accessions subcommand +@clinical.command("parse-sequencing") +@click.argument("accession_ids_filename", metavar = "") +@click.argument("record_type", metavar="", type=click.Choice(['hcov19', 'rsv-a', 'rsv-b', 'flu-a', 'flu-b'])) +@click.argument("segment_accession_ids_filename", nargs=-1, metavar = "") +@click.option("-o", "--output", metavar="", + help="The filename for the output of missing barcodes") + +def parse_sequencing_accessions(accession_ids_filename, record_type, segment_accession_ids_filename, output): + """ + Process sequencing accession IDs file. + + Given a of a TSV or Excel file, selects specific + columns of interest and reformats the queried data into a stream of JSON + documents suitable for the "upload" sibling command. + + A file is required for Flu sequences. + + is the desired filepath of the output CSV of problematic + barcodes encountered while parsing. If not provided, the problematic + barcodes print to the log. + + All records parsed are output to stdout as newline-delimited JSON + records. You will likely want to redirect stdout to a file. + """ + if accession_ids_filename.endswith('.tsv'): + read = pd.read_csv + else: + read = pd.read_excel + + read_accessions = partial( + read, + na_values = ['NA', '', 'Unknown', 'NULL'] + ) + clinical_records = ( + read_accessions(accession_ids_filename, sep='\t') + .pipe(trim_whitespace) + .pipe(add_provenance, accession_ids_filename)) + + # only keep submitted records + clinical_records = clinical_records[clinical_records.status == 'submitted'] + + column_map = { + 'sequence_identifier': 'sequence_identifier', + 'sfs_sample_barcode': 'barcode', + 'strain_name': 'strain_name', + 'nwgc_id': 'nwgc_id', + 'gisaid_accession': 'gisaid_accession', + 'genbank_accession': 'genbank_accession', + 'pathogen': 'pathogen', + '_provenance': '_provenance' + } + if record_type in ['flu-a', 'flu-b']: + assert segment_accession_ids_filename and len(segment_accession_ids_filename)==1 , 'Error: Missing required segment accession IDs file.' + segment_accession_ids_filename = segment_accession_ids_filename[0] + + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + column_map['subtype'] = 'subtype' + column_map['segment'] = 'segment' + if segment_accession_ids_filename.endswith('.tsv'): + read = pd.read_csv + else: + read = pd.read_excel + + # Exlcuding "NA" from n/a values because it is used as neuraminidase segment code. + read_segment_accessions = partial( + read, + na_values = ['', 'Unknown', 'NULL'], + keep_default_na = False + ) + clinical_records_segments = ( + read_segment_accessions(segment_accession_ids_filename, sep='\t') + .pipe(trim_whitespace) + .pipe(add_provenance, segment_accession_ids_filename)) + + clinical_records_segments = clinical_records_segments[['strain_name', 'genbank_accession', 'sequence_id', 'segment', '_provenance']] + + # Drop overlapping columns prior to merging + clinical_records.drop(columns=['genbank_accession', '_provenance'], inplace=True) + clinical_records = clinical_records.merge(clinical_records_segments, on='strain_name') + + if record_type in ['rsv-a', 'rsv-b']: + assert 'subtype' not in clinical_records.columns, 'Error: unexpected column `subtype` in sequence records.' + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + elif record_type == 'hcov19': + assert 'pathogen' not in clinical_records.columns, 'Error: unexpected column `pathogen` in sequence records.' + clinical_records['pathogen'] = 'hcov19' + + if clinical_records.empty: + dump_ndjson(clinical_records) + else: + clinical_records['sequence_identifier'] = clinical_records.apply( + lambda row: generate_hash(row['strain_name']) + '-' + row['pathogen'].upper().replace('-', ''), axis=1 + ) + + clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) + + # Flu data is by segment, so skipping barcode uniqueness check + if record_type not in ['flu-a', 'flu-b']: + barcode_quality_control(clinical_records, output) + + # Drop columns we're not tracking + clinical_records = clinical_records[column_map.values()] + + dump_ndjson(clinical_records) + + # UW Clinical subcommand @clinical.command("parse-uw") @click.argument("uw_filename", metavar = "") diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical.py b/lib/seattleflu/id3c/cli/command/etl/clinical.py index b068af42..cdce04f6 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical.py @@ -11,7 +11,7 @@ from id3c.db.session import DatabaseSession from id3c.db.datatypes import Json from id3c.cli.command.etl.redcap_det import insert_fhir_bundle - +from id3c.db.types import MinimalSampleRecord, GenomeRecord, OrganismRecord from id3c.cli.command.etl import ( etl, @@ -39,6 +39,7 @@ from . import race, ethnicity from .fhir import * from .clinical_retrospectives import * +from id3c.cli.command.etl.consensus_genome import find_organism from .redcap_map import map_symptom @@ -104,9 +105,45 @@ def etl_clinical(*, db: DatabaseSession): # Most of the time we expect to see existing sites so a # select-first approach makes the most sense to avoid useless # updates. - site = find_or_create_site(db, - identifier = site_identifier(record.document["site"]), - details = {"type": "retrospective"}) + if record.document.get("site"): + site = find_or_create_site(db, + identifier = site_identifier(record.document["site"]), + details = {"type": "retrospective"}) + else: + site = None + + # Sequencing accession IDs are being loaded into the clinical receiving table, and will + # be processed differently than other records, populating only the warehouse.consensus_genome and + # warehouse.genomic_sequence tables with the relevant data. + if record.document.get('genbank_accession') or record.document.get('gisaid_accession'): + if record.document['pathogen'] == 'flu-a': + record.document['organism'] = record.document['pathogen'] + '::' + record.document['subtype'] + else: + record.document['organism'] = record.document['pathogen'] + # Find the matching organism within the warehouse for the reference organism + organism_name_map = { + 'rsv-a': 'RSV.A', + 'rsv-b': 'RSV.B', + 'hcov19': 'Human_coronavirus.2019', + 'flu-a::h1n1': 'Influenza.A.H1N1', + 'flu-a::h3n2': 'Influenza.A.H3N2', + 'flu-b': 'Influenza.B' + } + organism = find_organism(db, organism_name_map[record.document['organism']]) + + assert organism, f"No organism found with name «{record.document['pathogen']}»" + + # Most of the time we expect to see new sequences, so an + # insert-first approach makes the most sense to avoid useless + # queries. + genome = upsert_genome(db, + sample = sample, + organism = organism) + + genomic_sequence = upsert_genomic_sequence(db, + genome = genome, + details = record.document) + # PHSKC and KP2023 will be handled differently than other clinical records, converted @@ -114,7 +151,7 @@ def etl_clinical(*, db: DatabaseSession): # by the FHIR ETL. When time allows, SCH and KP should follow suit. # Since KP2023 and KP samples both have KaiserPermanente as their site in id3c, # use the ndjson document's site to distinguish KP vs KP2023 samples - if site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023': + elif site and (site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023'): fhir_bundle = generate_fhir_bundle(db, record.document, site.identifier) insert_fhir_bundle(db, fhir_bundle) @@ -159,6 +196,82 @@ def etl_clinical(*, db: DatabaseSession): LOG.info(f"Finished processing clinical record {record.id}") +def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: OrganismRecord) -> GenomeRecord: + """ + Upsert consensus genomes with the given *organism* and consensus genome *document*. + """ + LOG.debug(f""" + Upserting genome with sample_id ${sample.id}, + organism {organism.id} «{organism.lineage}»""") + + data = { + "sample_id": sample.id, + "organism_id": organism.id + } + + genome: GenomeRecord = db.fetch_row(""" + insert into warehouse.consensus_genome (sample_id, organism_id) + values (%(sample_id)s, %(organism_id)s) + + on conflict (sample_id, organism_id) where sequence_read_set_id is null do update + set sample_id = excluded.sample_id, + organism_id = excluded.organism_id + + returning consensus_genome_id as id, sample_id, organism_id + """, data) + + assert genome.id, "Upsert affected no rows!" + + LOG.info(f""" + Upserted genome {genome.id} with sample ID «{genome.sample_id}» + and organism ID «{genome.organism_id}» + """) + + return genome + +def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: dict) -> Any: + """ + Upsert genomic sequence given a *genome* record and *details*. + """ + sequence_identifier = details['sequence_identifier'] + '-' + details.get('segment', '') + LOG.info(f"Upserting genomic sequence «{sequence_identifier}»") + + data = { + "identifier": sequence_identifier, + "segment": details.get('segment', ''), + "seq": "", + "genome_id": genome.id, + "additional_details": Json({ + k:v for k,v in details.items() if k in [ + 'nwgc_id', + 'strain_name', + 'genbank_accession', + 'gisaid_accession', + '_provenance' + ] + }) + } + + genomic_sequence = db.fetch_row(""" + insert into warehouse.genomic_sequence (identifier, segment, seq, consensus_genome_id, details) + values (%(identifier)s, %(segment)s, %(seq)s, %(genome_id)s, %(additional_details)s) + + on conflict (identifier) do update + set seq = excluded.seq, + segment = excluded.segment, + details = excluded.details + + returning genomic_sequence_id as id, identifier, segment, seq, consensus_genome_id + """, data) + + #assert genomic_sequence.consensus_genome_id == genome.id, \ + # "Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!" + assert genomic_sequence.id, "Upsert affected no rows!" + + LOG.info(f"Upserted genomic sequence {genomic_sequence.id}»") + + return genomic_sequence + def create_encounter(db: DatabaseSession, record: dict, patient_reference: dict,