Skip to content

Commit

Permalink
chore: Run ruff format
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Oct 12, 2024
1 parent 8fe430d commit 08e9cd5
Showing 1 changed file with 69 additions and 66 deletions.
135 changes: 69 additions & 66 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ def parse(response):


def warn(message):
click.secho(message, err=True, fg='yellow')
click.secho(message, err=True, fg="yellow")


@contextmanager
def csv_dump(filename, fieldnames):
"""
Writes CSV headers to the given filename, and yields a ``csv.writer``.
"""
f = (basedir / 'codelists' / filename).open('w')
writer = csv.writer(f, lineterminator='\n')
f = (basedir / "codelists" / filename).open("w")
writer = csv.writer(f, lineterminator="\n")
writer.writerow(fieldnames)
try:
yield writer
Expand All @@ -39,18 +39,21 @@ def csv_dump(filename, fieldnames):
def edqm(email, password, url):
with requests.Session() as session:
# Get the CSRF token.
response = session.get('https://standardterms.edqm.eu/user/login')
response = session.get("https://standardterms.edqm.eu/user/login")
response.raise_for_status()

formkey = parse(response).xpath('//input[@name="_formkey"]/@value')[0]

# https://stackoverflow.com/a/12385661/244258
response = session.post('https://standardterms.edqm.eu', files={
'email': (None, email),
'password': (None, password),
'_formkey': (None, formkey),
'_formname': (None, 'login'),
})
response = session.post(
"https://standardterms.edqm.eu",
files={
"email": (None, email),
"password": (None, password),
"_formkey": (None, formkey),
"_formname": (None, "login"),
},
)
response.raise_for_status()

# The "export" links do not include definitions, so we scrape the page.
Expand All @@ -59,64 +62,64 @@ def edqm(email, password, url):

writer = csv.writer(sys.stdout)
for status in parse(response).xpath('//span[starts-with(@id, "status_0_")]'):
if status.xpath('./span/text()')[0] != 'Current':
if status.xpath("./span/text()")[0] != "Current":
continue

response = session.post(f"https://standardterms.edqm.eu/browse/get_details/{status.attrib['id'][9:]}/en")
response.raise_for_status()

document = parse(response)
keys = document.xpath('.//strong/text()')
keys = document.xpath(".//strong/text()")
values = [value.strip() for value in document.xpath('.//span[@class="span6"]/text()')]
properties = dict(zip(keys, values))

if properties['Domain'] != 'Veterinary only':
writer.writerow([properties['Term'], properties['Definition']])
if properties["Domain"] != "Veterinary only":
writer.writerow([properties["Term"], properties["Definition"]])


def hl7(codelist):
response = requests.get(f'https://terminology.hl7.org/CodeSystem-v3-{codelist}.json')
response = requests.get(f"https://terminology.hl7.org/CodeSystem-v3-{codelist}.json")
response.raise_for_status()

data = response.json()

multi_value_properties = ('subsumedBy', 'synonymCode')
multi_value_properties = ("subsumedBy", "synonymCode")
properties = set()

# Transform the list of dicts into a dict.
for code in data['concept']:
code['properties'] = {}
for code in data["concept"]:
code["properties"] = {}
for prop in multi_value_properties:
code['properties'][prop] = set()
for prop in code['property']:
properties.add(prop['code'])
code["properties"][prop] = set()
for prop in code["property"]:
properties.add(prop["code"])
name, value = prop.values()
if name in multi_value_properties:
code['properties'][name].add(value)
elif name in code['properties']:
code["properties"][name].add(value)
elif name in code["properties"]:
raise Exception(f"{name} set to {code['properties'][name]}, not {value}")
else:
code['properties'][name] = value
code["properties"][name] = value

not_selectable = {code['code'] for code in data['concept'] if code['properties'].get('notSelectable')}
not_selectable = {code["code"] for code in data["concept"] if code["properties"].get("notSelectable")}

if codelist == 'RouteOfAdministration':
expected = {'internalId', 'notSelectable', 'status', 'subsumedBy', 'synonymCode'}
elif codelist == 'orderableDrugForm':
expected = {'internalId', 'notSelectable', 'status', 'subsumedBy'}
if codelist == "RouteOfAdministration":
expected = {"internalId", "notSelectable", "status", "subsumedBy", "synonymCode"}
elif codelist == "orderableDrugForm":
expected = {"internalId", "notSelectable", "status", "subsumedBy"}
else:
expected = set()

difference = properties - expected
if difference:
warn(f'{codelist}: unexpected new properties: {sorted(difference)}')
warn(f"{codelist}: unexpected new properties: {sorted(difference)}")

codes = []
for code in data['concept']:
for code in data["concept"]:
if (
not code['properties'].get('notSelectable')
and code['properties']['status'] == 'active'
and any(parent in not_selectable for parent in code['properties']['subsumedBy'])
not code["properties"].get("notSelectable")
and code["properties"]["status"] == "active"
and any(parent in not_selectable for parent in code["properties"]["subsumedBy"])
):
codes.append(code)

Expand All @@ -135,19 +138,19 @@ def update_container():
"""
# Retain the descriptions from EDQM.
descriptions = {}
with (basedir / 'codelists' / 'immediateContainer.csv').open() as f:
with (basedir / "codelists" / "immediateContainer.csv").open() as f:
reader = csv.DictReader(f)
for row in reader:
descriptions[row['Code']] = row['Description']
descriptions[row["Code"]] = row["Description"]

# https://terminology.hl7.org/CodeSystem/medicationknowledge-package-type/
response = requests.get('https://terminology.hl7.org/CodeSystem-medicationknowledge-package-type.json')
response = requests.get("https://terminology.hl7.org/CodeSystem-medicationknowledge-package-type.json")
response.raise_for_status()

data = response.json()

with csv_dump('immediateContainer.csv', ['Code', 'Title', 'Description']) as writer:
writer.writerows([[code['code'], code['display'], descriptions[code['code']]] for code in data['concept']])
with csv_dump("immediateContainer.csv", ["Code", "Title", "Description"]) as writer:
writer.writerows([[code["code"], code["display"], descriptions[code["code"]]] for code in data["concept"]])


@cli.command()
Expand All @@ -156,7 +159,7 @@ def update_administration_route():
Update schema/codelists/administrationRoute.csv.
"""
# https://terminology.hl7.org/CodeSystem/v3-RouteOfAdministration/
codes, not_selectable = hl7('RouteOfAdministration')
codes, not_selectable = hl7("RouteOfAdministration")

# "definition" is not used for Description, because it is the same as the "display", except for:
#
Expand All @@ -165,15 +168,15 @@ def update_administration_route():
# - "instillation, urethral", "Instillation, urethral" (lettercase change)
# - "Topical application, vaginal", "Insertion, vaginal" (typographical error)

with csv_dump('administrationRoute.csv', ['Code', 'Title']) as writer:
with csv_dump("administrationRoute.csv", ["Code", "Title"]) as writer:
for code in codes:
if code['properties']['synonymCode']:
if code["properties"]["synonymCode"]:
# Prefer IPINHL to its synonyms.
if code['code'] in ('ORINHL', 'RESPINHL'):
if code["code"] in ("ORINHL", "RESPINHL"):
continue
elif code['code'] != 'IPINHL':
warn(f'RouteOfAdministration: unexpected synonymous code: {code}')
writer.writerow([code['code'], code['display'][0].upper() + code['display'][1:]])
elif code["code"] != "IPINHL":
warn(f"RouteOfAdministration: unexpected synonymous code: {code}")
writer.writerow([code["code"], code["display"][0].upper() + code["display"][1:]])


@cli.command()
Expand All @@ -182,13 +185,13 @@ def update_dosage_form():
Update schema/codelists/dosageForm.csv from HL7.
"""
# https://terminology.hl7.org/CodeSystem/v3-orderableDrugForm/
codes, not_selectable = hl7('orderableDrugForm')
codes, not_selectable = hl7("orderableDrugForm")

with csv_dump('dosageForm.csv', ['Code', 'Title', 'Description']) as writer:
with csv_dump("dosageForm.csv", ["Code", "Title", "Description"]) as writer:
for code in codes:
if 'SPRY' in code['code'] and code['code'] != 'SPRY':
if "SPRY" in code["code"] and code["code"] != "SPRY":
continue
writer.writerow([code['code'], code['display'], code.get('definition')])
writer.writerow([code["code"], code["display"], code.get("definition")])


@cli.command()
Expand All @@ -203,45 +206,45 @@ def update(ctx):


@cli.command()
@click.argument('email')
@click.argument('password')
@click.argument("email")
@click.argument("password")
def print_edqm_container(email, password):
edqm(email, password, 'https://standardterms.edqm.eu/browse/get_back_links/en/PAC_PAC/786')
edqm(email, password, "https://standardterms.edqm.eu/browse/get_back_links/en/PAC_PAC/786")


@cli.command()
@click.argument('email')
@click.argument('password')
@click.argument("email")
@click.argument("password")
def print_edqm_administration_route(email, password):
edqm(email, password, 'https://standardterms.edqm.eu/browse/get_concepts/ROA')
edqm(email, password, "https://standardterms.edqm.eu/browse/get_concepts/ROA")


@cli.command()
def download_inn_lists():
os.makedirs('inn', exist_ok=True)
os.makedirs("inn", exist_ok=True)

response = requests.get('https://www.who.int/teams/health-product-and-policy-standards/inn/inn-lists')
response = requests.get("https://www.who.int/teams/health-product-and-policy-standards/inn/inn-lists")
response.raise_for_status()

# Note: PDFs are scans before RL46 (September 2001) and PL86 (March 2002).
document = parse(response)
base_url = 'https://cdn.who.int/media/docs/default-source/international-nonproprietary-names-(inn)/'
for column, prefix in (('PageContent_C021_Col00', 'pl'), ('PageContent_C021_Col01', 'rl')):
base_url = "https://cdn.who.int/media/docs/default-source/international-nonproprietary-names-(inn)/"
for column, prefix in (("PageContent_C021_Col00", "pl"), ("PageContent_C021_Col01", "rl")):
for href in document.xpath(f'//div[@id="{column}"]//@href'):
# Handle exceptions like:
# https://www.who.int/publications/m/item/inn-proposed-list-57
# https://www.who.int/publications/m/item/inn-pl-125-covid
suffix = re.search(r'\d+.*', href.lower()).group(0)
basename = f'{prefix}{suffix}.pdf'
filename = os.path.join('inn', basename)
suffix = re.search(r"\d+.*", href.lower()).group(0)
basename = f"{prefix}{suffix}.pdf"
filename = os.path.join("inn", basename)
if not os.path.exists(filename):
click.echo(f'INFO - Downloading {basename}')
click.echo(f"INFO - Downloading {basename}")
response = requests.get(base_url + basename)
response.raise_for_status()

with open(filename, 'wb') as f:
with open(filename, "wb") as f:
f.write(response.content)


if __name__ == '__main__':
if __name__ == "__main__":
cli()

0 comments on commit 08e9cd5

Please sign in to comment.