From d6c18c41af01c5b9aa88485d61a2ec12de551850 Mon Sep 17 00:00:00 2001 From: Sally Grindstaff Date: Fri, 29 Sep 2023 09:45:44 -0700 Subject: [PATCH] Update KP2023 ETL to reflect new information from KP --- lib/seattleflu/id3c/cli/command/clinical.py | 60 +++++++++++-------- .../id3c/cli/command/etl/clinical.py | 2 +- .../command/etl/clinical_retrospectives.py | 3 + 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index a897a53b..cfe2ff3d 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -886,7 +886,7 @@ def parse_kp2023(kp2023_filename: str) -> None: column_map = { 'marshfield_lab_id': 'collection_id', # will be mapped to lims barcode during id3c clinical match-kp2023 'hispaniclatino': 'ethnicity', - 'assignedsex': 'sex', + 'assigned_sex': 'sex', 'symptom__throat': 'symptom_throat', # fix extra underscore in column name 'censustract': 'census_tract', 'type_of_visit': 'patient_class', @@ -898,7 +898,7 @@ def parse_kp2023(kp2023_filename: str) -> None: # check for missing or duplicated barcodes #barcode_quality_control(clinical_records) - # map high risk codes to ICD-10 codes, and collapse into one columns 'icd10' + # map high risk codes to ICD-10 codes, and collapse into one column 'icd10' clinical_records = map_icd10_codes(clinical_records) # collapse race and symptom columns @@ -915,6 +915,9 @@ def parse_kp2023(kp2023_filename: str) -> None: # map ethnicity map_ethnicity_kp2023(clinical_records, 'ethnicity') + # map sex + clinical_records['sex'] = clinical_records['sex'].map({0: 'female', 1: 'male'}) + # insert site # ideally site would be 'KP', but currently KP2023's ETL is set up to be processed with FHIR # during id3c etl clinical, while the KP ETL is not processed with FHIR. @@ -932,23 +935,22 @@ def parse_kp2023(kp2023_filename: str) -> None: # apply age ceiling, hash individual id clinical_records['age'] = clinical_records['age'].apply(age_ceiling) - clinical_records['individual'] = clinical_records['individual'].apply(generate_hash) # create hashed encounter id + # although KP provides an 'individual' column, we ignore it because there is exactly 1 individual value per collection id (Marshfield lab ID) + # so we ignore the individual column and instead treat collection id as an individual id + # therefore, we use the collection id and the encounter date to create a hashed encounter id clinical_records['identifier'] = clinical_records.apply( lambda row: generate_hash( - f"{row['individual']}{row['encountered']}".lower() + f"{row['collection_id']}{row['encountered']}".lower() ), axis=1 ) - # during development, create a placeholder column for sex until we get that data from KP - if 'sex' not in clinical_records.columns: - clinical_records['sex'] = None - - # after development, throw error if no sex column. And maybe drop records that don't have sex, since that will be necessary to create FHIR patient resource? - - # drop records with missing values: barcode/collectionid (not sure which variable this maps to yet), encounter date, sex + # drop records with missing values: barcode/collectionid (not sure which variable this maps to yet), encounter date + # also require sex, which is necessary for creating patient resource clinical_records = drop_missing_rows(clinical_records, 'encountered') + clinical_records = drop_missing_rows(clinical_records, 'collection_id') + clinical_records = drop_missing_rows(clinical_records, 'sex') # Incoming `encountered` value is typically just date but is cast to datetime with timezone in postgres. Timezone is # being specified here to ensure values are set to midnight local time instead of UTC. @@ -963,7 +965,6 @@ def parse_kp2023(kp2023_filename: str) -> None: # ensure there are no unintended columns being kept columns_to_keep = [ '_provenance', - 'individual', 'identifier', 'site', 'sex', @@ -1170,22 +1171,27 @@ def map_ethnicity_kp2023(df: pd.DataFrame, column: str) -> None: @click.argument("kp2023_manifest_filename", metavar = "", type = click.Path(exists= True, dir_okay=False)) @click.argument("kp2023_manifest_matched_filename", metavar = "", - type = click.Path(exists=True, dir_okay=False)) -@click.argument("kp2023_manifest_unmatched_output_filename", metavar = "", type = click.Path(dir_okay=False)) +@click.argument("kp2023_manifest_unmatched_output_filename", metavar = "", + type = click.Path(dir_okay=False)) + def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename: str, kp2023_manifest_unmatched_output_filename: str) -> None: """ Match clinical data from KP2023 with identifiers from the LIMS. Given a which has records to be matched, - and a which has records that have + and, a which has records that have already been matched to LIMS identifiers, attempts to match the records in to LIMS data and adds any newly matched records to the matched file. Removes any matches from - before writing it to . + before writing it to . - does not have to exist, and + does not have to be an existing file, + but a filename must be provided. If the file does not exist, the newly matched records + will be output to stdout without consolidating with previously matched records. + + does not have to exist, and if a file with this path exists, it will be overwritten. PII is removed by this function. @@ -1196,8 +1202,13 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename You will likely want to redirect stdout to a file. """ clinical_records = pd.read_json(kp2023_manifest_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True) - matched_clinical_records = pd.read_json(kp2023_manifest_matched_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True) - + # if the file exists, read the file into a df; otherwise, create an empty df + if os.path.exists(kp2023_manifest_matched_filename): + matched_clinical_records = pd.read_json(kp2023_manifest_matched_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True) + else: + LOG.debug("No previously matched data was provided.") + matched_clinical_records = pd.DataFrame() + LOG.info(f"Attempting to match {len(clinical_records)} identifiers to LIMS data") # identifier_pairs is a dict where keys are clinical identifiers and values are corresponding matrixIds from LIMS identifier_pairs = match_lims_identifiers(clinical_records, KP2023_IDENTIFIERS) @@ -1213,10 +1224,11 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename newly_matched_clinical_records = clinical_records.loc[~clinical_records['barcode'].isna()] unmatched_clinical_records = clinical_records[clinical_records['barcode'].isna()] - # drop any PII - newly_matched_clinical_records = newly_matched_clinical_records.drop( - columns=list(KP2023_IDENTIFIERS.keys()) - ) + # keep clinical identifier (collection_id) to use as individual id for FHIR bundle + # in this case, the clinical identifier is probably a 'safe' external ID, but we treat it as PII out of an abundance of caution + # therefore, hash the clinical identifier in newly_matched_clinical_records + for clinical_identifier in KP2023_IDENTIFIERS.keys(): + newly_matched_clinical_records[clinical_identifier] = newly_matched_clinical_records[clinical_identifier].apply(generate_hash) # newly matched barcodes shouldn't be in our previously matched data. # any barcodes that show up in a previous parse might contain refreshed data, @@ -1228,7 +1240,7 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename # then it will not contain any old matched records, so we want to store all old matched records # and add to that manifest each time this command is run. if newly_matched_clinical_records.empty: - LOG.debug(f"No new records were matched to LIMS data") + LOG.debug("No new records were matched to LIMS data") elif matched_clinical_records.empty: LOG.debug(f"{len(newly_matched_clinical_records)} were matched. No previously matched data to consolidate") else: diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical.py b/lib/seattleflu/id3c/cli/command/etl/clinical.py index 4d37ad5f..1f52b764 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical.py @@ -109,7 +109,7 @@ def etl_clinical(*, db: DatabaseSession): details = {"type": "retrospective"}) - # PHSKC and KP2023 will be handled differently that other clinical records, converted + # PHSKC and KP2023 will be handled differently than other clinical records, converted # to FHIR format and inserted into receiving.fhir table to be processed # by the FHIR ETL. When time allows, SCH and KP should follow suit. if site.identifier in ('RetrospectivePHSKC', 'KaiserPermanente2023'): diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical_retrospectives.py b/lib/seattleflu/id3c/cli/command/etl/clinical_retrospectives.py index 54ec31e3..ad25b00b 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical_retrospectives.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical_retrospectives.py @@ -177,6 +177,9 @@ def create_patient(record: dict) -> Optional[tuple]: # uw retro samples elif record.get("personid", None): patient_id = generate_hash(record["personid"].lower()) + # kp2023 samples + elif record.get("collection_id", None): + patient_id = record["collection_id"] # has already been hashed in clinical match-kp2023 function else: return None, None