Skip to content

Commit

Permalink
Update KP2023 ETL to reflect new information from KP
Browse files Browse the repository at this point in the history
  • Loading branch information
sallybg committed Oct 16, 2023
1 parent 2d8bd87 commit d6c18c4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
60 changes: 36 additions & 24 deletions lib/seattleflu/id3c/cli/command/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ def parse_kp2023(kp2023_filename: str) -> None:
column_map = {
'marshfield_lab_id': 'collection_id', # will be mapped to lims barcode during id3c clinical match-kp2023
'hispaniclatino': 'ethnicity',
'assignedsex': 'sex',
'assigned_sex': 'sex',
'symptom__throat': 'symptom_throat', # fix extra underscore in column name
'censustract': 'census_tract',
'type_of_visit': 'patient_class',
Expand All @@ -898,7 +898,7 @@ def parse_kp2023(kp2023_filename: str) -> None:
# check for missing or duplicated barcodes
#barcode_quality_control(clinical_records)

# map high risk codes to ICD-10 codes, and collapse into one columns 'icd10'
# map high risk codes to ICD-10 codes, and collapse into one column 'icd10'
clinical_records = map_icd10_codes(clinical_records)

# collapse race and symptom columns
Expand All @@ -915,6 +915,9 @@ def parse_kp2023(kp2023_filename: str) -> None:
# map ethnicity
map_ethnicity_kp2023(clinical_records, 'ethnicity')

# map sex
clinical_records['sex'] = clinical_records['sex'].map({0: 'female', 1: 'male'})

# insert site
# ideally site would be 'KP', but currently KP2023's ETL is set up to be processed with FHIR
# during id3c etl clinical, while the KP ETL is not processed with FHIR.
Expand All @@ -932,23 +935,22 @@ def parse_kp2023(kp2023_filename: str) -> None:

# apply age ceiling, hash individual id
clinical_records['age'] = clinical_records['age'].apply(age_ceiling)
clinical_records['individual'] = clinical_records['individual'].apply(generate_hash)

# create hashed encounter id
# although KP provides an 'individual' column, we ignore it because there is exactly 1 individual value per collection id (Marshfield lab ID)
# so we ignore the individual column and instead treat collection id as an individual id
# therefore, we use the collection id and the encounter date to create a hashed encounter id
clinical_records['identifier'] = clinical_records.apply(
lambda row: generate_hash(
f"{row['individual']}{row['encountered']}".lower()
f"{row['collection_id']}{row['encountered']}".lower()
), axis=1
)

# during development, create a placeholder column for sex until we get that data from KP
if 'sex' not in clinical_records.columns:
clinical_records['sex'] = None

# after development, throw error if no sex column. And maybe drop records that don't have sex, since that will be necessary to create FHIR patient resource?

# drop records with missing values: barcode/collectionid (not sure which variable this maps to yet), encounter date, sex
# drop records with missing values: barcode/collectionid (not sure which variable this maps to yet), encounter date
# also require sex, which is necessary for creating patient resource
clinical_records = drop_missing_rows(clinical_records, 'encountered')
clinical_records = drop_missing_rows(clinical_records, 'collection_id')
clinical_records = drop_missing_rows(clinical_records, 'sex')

# Incoming `encountered` value is typically just date but is cast to datetime with timezone in postgres. Timezone is
# being specified here to ensure values are set to midnight local time instead of UTC.
Expand All @@ -963,7 +965,6 @@ def parse_kp2023(kp2023_filename: str) -> None:
# ensure there are no unintended columns being kept
columns_to_keep = [
'_provenance',
'individual',
'identifier',
'site',
'sex',
Expand Down Expand Up @@ -1170,22 +1171,27 @@ def map_ethnicity_kp2023(df: pd.DataFrame, column: str) -> None:
@click.argument("kp2023_manifest_filename", metavar = "<KP2023 Clinical Manifest filename>",
type = click.Path(exists= True, dir_okay=False))
@click.argument("kp2023_manifest_matched_filename", metavar = "<KP2023 Clinical Manifest Matched Data filename>",
type = click.Path(exists=True, dir_okay=False))
@click.argument("kp2023_manifest_unmatched_output_filename", metavar = "<KP2023 Clinical Manifest Data output filename>",
type = click.Path(dir_okay=False))
@click.argument("kp2023_manifest_unmatched_output_filename", metavar = "<KP2023 Clinical Manifest Unmatched Data output filename>",
type = click.Path(dir_okay=False))


def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename: str, kp2023_manifest_unmatched_output_filename: str) -> None:
"""
Match clinical data from KP2023 with identifiers from the LIMS.
Given a <KP2023 Clinical Manifest filename> which has records to be matched,
and a <PHSKC Clinical Manifest Matched Data filename> which has records that have
and, a <KP2023 Clinical Manifest Matched Data filename> which has records that have
already been matched to LIMS identifiers, attempts to match the records in
<KP2023 Clinical Manifest filename> to LIMS data and adds any newly matched
records to the matched file. Removes any matches from <KP2023 Clinical Manifest name>
before writing it to <KP2023 Clinical Manifest Data output filename>.
before writing it to <KP2023 Clinical Manifest Unmatched Data output filename>.
<KP2023 Clinical Manifest Data output filename> does not have to exist, and
<KP2023 Clinical Manifest Matched Date filename> does not have to be an existing file,
but a filename must be provided. If the file does not exist, the newly matched records
will be output to stdout without consolidating with previously matched records.
<KP2023 Clinical Manifest Unmatched Data output filename> does not have to exist, and
if a file with this path exists, it will be overwritten.
PII is removed by this function.
Expand All @@ -1196,8 +1202,13 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename
You will likely want to redirect stdout to a file.
"""
clinical_records = pd.read_json(kp2023_manifest_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True)
matched_clinical_records = pd.read_json(kp2023_manifest_matched_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True)

# if the <KP2023 Clinical Manifest Matched Data filename> file exists, read the file into a df; otherwise, create an empty df
if os.path.exists(kp2023_manifest_matched_filename):
matched_clinical_records = pd.read_json(kp2023_manifest_matched_filename, orient='records', dtype={'census_tract': 'string', 'age': 'int64'}, lines=True)
else:
LOG.debug("No previously matched data was provided.")
matched_clinical_records = pd.DataFrame()

LOG.info(f"Attempting to match {len(clinical_records)} identifiers to LIMS data")
# identifier_pairs is a dict where keys are clinical identifiers and values are corresponding matrixIds from LIMS
identifier_pairs = match_lims_identifiers(clinical_records, KP2023_IDENTIFIERS)
Expand All @@ -1213,10 +1224,11 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename
newly_matched_clinical_records = clinical_records.loc[~clinical_records['barcode'].isna()]
unmatched_clinical_records = clinical_records[clinical_records['barcode'].isna()]

# drop any PII
newly_matched_clinical_records = newly_matched_clinical_records.drop(
columns=list(KP2023_IDENTIFIERS.keys())
)
# keep clinical identifier (collection_id) to use as individual id for FHIR bundle
# in this case, the clinical identifier is probably a 'safe' external ID, but we treat it as PII out of an abundance of caution
# therefore, hash the clinical identifier in newly_matched_clinical_records
for clinical_identifier in KP2023_IDENTIFIERS.keys():
newly_matched_clinical_records[clinical_identifier] = newly_matched_clinical_records[clinical_identifier].apply(generate_hash)

# newly matched barcodes shouldn't be in our previously matched data.
# any barcodes that show up in a previous parse might contain refreshed data,
Expand All @@ -1228,7 +1240,7 @@ def match_kp2023(kp2023_manifest_filename: str, kp2023_manifest_matched_filename
# then it will not contain any old matched records, so we want to store all old matched records
# and add to that manifest each time this command is run.
if newly_matched_clinical_records.empty:
LOG.debug(f"No new records were matched to LIMS data")
LOG.debug("No new records were matched to LIMS data")
elif matched_clinical_records.empty:
LOG.debug(f"{len(newly_matched_clinical_records)} were matched. No previously matched data to consolidate")
else:
Expand Down
2 changes: 1 addition & 1 deletion lib/seattleflu/id3c/cli/command/etl/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def etl_clinical(*, db: DatabaseSession):
details = {"type": "retrospective"})


# PHSKC and KP2023 will be handled differently that other clinical records, converted
# PHSKC and KP2023 will be handled differently than other clinical records, converted
# to FHIR format and inserted into receiving.fhir table to be processed
# by the FHIR ETL. When time allows, SCH and KP should follow suit.
if site.identifier in ('RetrospectivePHSKC', 'KaiserPermanente2023'):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def create_patient(record: dict) -> Optional[tuple]:
# uw retro samples
elif record.get("personid", None):
patient_id = generate_hash(record["personid"].lower())
# kp2023 samples
elif record.get("collection_id", None):
patient_id = record["collection_id"] # has already been hashed in clinical match-kp2023 function
else:
return None, None

Expand Down

0 comments on commit d6c18c4

Please sign in to comment.