forked from turicas/covid19-br
-
Notifications
You must be signed in to change notification settings - Fork 0
/
full.py
112 lines (90 loc) · 4.08 KB
/
full.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from collections import Counter, defaultdict
from functools import lru_cache
from operator import attrgetter
from pathlib import Path
import rows
from rows.utils import load_schema
import demographics
DATA_PATH = Path(__file__).parent / "data"
SCHEMA_PATH = Path(__file__).parent / "schema"
def read_cases(input_filename, order_by=None):
cases = rows.import_from_csv(input_filename, force_types=load_schema(str(SCHEMA_PATH / "caso.csv")))
if order_by:
cases.order_by(order_by)
return cases
@lru_cache()
def read_epidemiological_week():
filename = "data/epidemiological-week.csv"
table = rows.import_from_csv(filename)
return {row.date: row.epidemiological_week for row in table}
@lru_cache(maxsize=6000)
def epidemiological_week(date):
return read_epidemiological_week()[date]
def row_key(row):
return (row.place_type, row.state, row.city or None)
def get_data(input_filename):
casos = read_cases(input_filename, order_by="date")
dates = sorted(set(c.date for c in casos))
caso_by_key = defaultdict(list)
for caso in casos:
caso_by_key[row_key(caso)].append(caso)
for place_cases in caso_by_key.values():
place_cases.sort(key=lambda row: row.date, reverse=True)
order_key = attrgetter("order_for_place")
last_case_for_place = {}
order_for_place = Counter()
for date in dates:
for place_key in demographics.place_keys():
place_type, state, city = place_key
place_cases = caso_by_key[place_key]
valid_place_cases = sorted(
[item for item in place_cases if item.date <= date], key=order_key, reverse=True,
)
if not valid_place_cases:
# There are no cases for this city for this date - skip
continue
# This place has at least one case for this date (or before),
# so use the newest one.
last_valid_case = valid_place_cases[0]
newest_case = place_cases[0]
is_last = date == last_valid_case.date == newest_case.date
order_for_place[place_key] += 1
new_case = {
"city": city,
"city_ibge_code": last_valid_case.city_ibge_code,
"date": date,
"epidemiological_week": epidemiological_week(date),
"estimated_population": last_valid_case.estimated_population,
"estimated_population_2019": last_valid_case.estimated_population_2019,
"is_last": is_last,
"is_repeated": last_valid_case.date != date,
"last_available_confirmed": last_valid_case.confirmed,
"last_available_confirmed_per_100k_inhabitants": last_valid_case.confirmed_per_100k_inhabitants,
"last_available_date": last_valid_case.date,
"last_available_death_rate": last_valid_case.death_rate,
"last_available_deaths": last_valid_case.deaths,
"order_for_place": order_for_place[place_key],
"place_type": place_type,
"state": state,
}
last_case = last_case_for_place.get(place_key, None)
if last_case is None:
new_confirmed = new_case["last_available_confirmed"]
new_deaths = new_case["last_available_deaths"]
else:
new_confirmed = new_case["last_available_confirmed"] - last_case["last_available_confirmed"]
new_deaths = new_case["last_available_deaths"] - last_case["last_available_deaths"]
new_case["new_confirmed"] = new_confirmed
new_case["new_deaths"] = new_deaths
last_case_for_place[place_key] = new_case
yield new_case
if __name__ == "__main__":
import argparse
from tqdm import tqdm
parser = argparse.ArgumentParser()
parser.add_argument("input_filename")
parser.add_argument("output_filename")
args = parser.parse_args()
writer = rows.utils.CsvLazyDictWriter(args.output_filename)
for row in tqdm(get_data(args.input_filename)):
writer.writerow(row)