-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
113 lines (85 loc) · 3.45 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# util.py
#
# utility functions common to multiple files
import json
import os
from pyzipcode import ZipCodeDatabase
import random
from geopy.geocoders import Nominatim
def address_to_coors(address):
'''converts an address string to lat lon coordiantes. Currently not
used due to being really slow'''
locator = Nominatim(user_agent = "myGeocoder")
try:
location = locator.geocode(address)
except BaseException as e:
return [None, None]
if location is None:
return [None, None]
return [location.latitude, location.longitude]
def zip_to_coors(zip_code):
'''helper function converts zip code to lat lon coordinates'''
zcdb = ZipCodeDatabase()
try:
zip_info = zcdb[zip_code]
longitude = zip_info.longitude
latitude = zip_info.latitude
except KeyError as e:
missing_zips = {94158: [37.771, -122.391],
98391: [47.174, -122.170],
97086: [45.443, -122.513],
60503: [41.7127, -88.2607]}
latitude = missing_zips[zip_code][0]
longitude = missing_zips[zip_code][1]
return [latitude, longitude]
def clean_email_address(email_address):
'''changes an email address to all lower case and strips out spaces
at the end'''
email_address = email_address.lower() # convert to lower case
email_address = email_address.replace(' ', '') # strip out spaces
return email_address
def parse_by_suffix(input_string, suffix, start_char=" "):
"""searches through input_string until it finds the suffix. Walks backwards
from the suffix and returns everything in the string between suffix and
the end_char"""
end = input_string.find(suffix)
start = end
while input_string[start] != start_char:
start -= 1
return input_string[start+1:end]
def parse_by_prefix(input_string, prefix, end_char=[" "]):
"""searches through input_string until it finds the prefix. Returns
everything in the string between prefix and the next instance of
end_char"""
start = input_string.find(prefix) + len(prefix)
end = start
while input_string[end] not in end_char:
end += 1
return input_string[start:end]
def parse_by_line(input_string, prefix_string, offset):
"""breaks input string by lines and finds the index of the prefix_line.
Returns the line that is offset past the prefix_line"""
split_by_line = input_string.splitlines()
prefix_line = split_by_line.index(prefix_string)
return split_by_line[prefix_line + offset]
def modify_all_users(field, value):
'''modify a data field for all users'''
user_files = os.listdir(os.path.join(os.getcwd(), "data/user"))
user_files = filter(lambda x: "_users.json" in x, user_files)
# iterate through all state json files
for state_file in user_files:
# prepend the relative path
state_json_path = os.path.join("data/user", state_file)
with open(state_json_path, "r") as f:
user_dict = json.load(f)
# modify all values
for email in user_dict.keys():
user_dict[email][field] = value #random.randint(0, 9999)
# write value back to file
state_data = json.dumps(user_dict, indent=4)
with open(state_json_path, "w") as f:
f.write(state_data)
return
class APIUseExceededError(Exception):
'''custom error class for when the API use is exceeded'''
pass