-
Notifications
You must be signed in to change notification settings - Fork 3
/
process.py
128 lines (106 loc) · 3.6 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Process raw tweets and store in new database.
Only id is kept, with SA2 and sentiment added.
Polygon matching sourced from
https://gis.stackexchange.com/questions/208546/check-if-a-point-falls-within-a-multipolygon-with-python
"""
import couchdb
from couchdb import design
import logging
import sys
import fiona
import time
from tweetAnalyzer import TweetAnalyzer
from shapely.geometry import shape, Point
DB_RAW_NAME = "raw_tweets"
DB_PRO_NAME = "processed_tweets"
DB_RAW_ADDRESS = "http://127.0.0.1:15984"
DB_PRO_ADDRESS = "http://127.0.0.1:5984"
GEO_JSON = "web/data/sa2_dump.json"
MELBOURNE_COORDS = (144.9631, -37.8136)
def view_unprocessed_raw(db):
"""Create a view of all unprocessed tweets in raw tweets db."""
map_fnc = """function(doc) {
if (!doc.processed) {
emit(doc._id, null);
}
}"""
view = design.ViewDefinition(
'raw_tweets', 'unprocessed', map_fnc
)
view.sync(db)
def tag_tweets(db_raw, db_pro, multipol):
"""Tag raw tweets with SA2 and store in processed db."""
results = db_raw.view('raw_tweets/unprocessed')
for res in results:
# Get tweet id.
id = res['id']
tweet = db_raw[id]
# Look for exact coordinates in tweet.
if tweet['coordinates']:
raw = tweet['coordinates']
coords = tuple(raw['coordinates'])
# Get the midpoint of place.
elif tweet['place']:
# Don't take midpoint of city, set own coords.
if (tweet['place']['name'] == 'Melbourne'):
coords = MELBOURNE_COORDS
else:
coords = average_bounding_box(
tweet['place']['bounding_box']['coordinates']
)
# Attempt to process if location exists.
if coords:
point = Point(coords)
code = None
for multi in multipol:
if point.within(shape(multi['geometry'])):
code = multi['properties']['SA2_Code_2011']
sentiment = TweetAnalyzer(tweet).analyzeSentiment()
stored_tweet = {
'_id': id, 'code': code,
'text': tweet['text'], 'sentiment': sentiment,
'created_at': tweet['created_at'],
'lang': tweet['lang']
}
db_pro.save(stored_tweet)
break
else:
logging.info("No coordinates found.")
# Tag tweet as processed.
doc = db_raw.get(id)
doc['processed'] = True
db_raw.save(doc)
def average_bounding_box(box):
"""Average list of 4 bounding box coordinates to a midpoint."""
lng = 0
lat = 0
for i in range(len(box[0])):
lng += box[0][i][0]
lat += box[0][i][1]
lat /= 4
lng /= 4
return lng, lat
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Read locations into memory.
multipol = fiona.open(GEO_JSON)
# Get raw tweets db.
couch_raw = couchdb.Server(DB_RAW_ADDRESS)
try:
db_raw = couch_raw[DB_RAW_NAME]
except Exception:
logging.error("Raw tweets DB does not exist.")
sys.exit(2)
# Get processed tweets db.
couch_pro = couchdb.Server(DB_PRO_ADDRESS)
if DB_PRO_NAME in couch_pro:
db_pro = couch_pro[DB_PRO_NAME]
else:
db_pro = couch_pro.create(DB_PRO_NAME)
# Tag and store tweets.
while True:
view_unprocessed_raw(db_raw)
tag_tweets(db_raw, db_pro, multipol)
logging.info("Tweets processed, sleeping...")
time.sleep(1200)