-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.py
147 lines (121 loc) · 4.74 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The app for the 'frontend' service, which handles cron job requests to
fetch tweets and store them in the Datastore.
"""
import datetime
import logging
import os
from google.appengine.ext import ndb
import twitter
import webapp2
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
class Tweet(ndb.Model):
"""Define the Tweet model."""
user = ndb.StringProperty()
text = ndb.StringProperty()
created_at = ndb.DateTimeProperty()
tid = ndb.IntegerProperty()
urls = ndb.StringProperty(repeated=True)
class LaunchJob(webapp2.RequestHandler):
"""Launch the Dataflow pipeline using a job template."""
def get(self):
is_cron = self.request.headers.get('X-Appengine-Cron', False)
# logging.info("is_cron is %s", is_cron)
# Comment out the following check to allow non-cron-initiated requests.
if not is_cron:
return 'Blocked.'
# These env vars are set in app.yaml.
PROJECT = os.environ['PROJECT']
BUCKET = os.environ['BUCKET']
TEMPLATE = os.environ['TEMPLATE_NAME']
# Because we're using the same job name each time, if you try to launch one
# job while another is still running, the second will fail.
JOBNAME = PROJECT + '-twproc-template'
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"gcsPath": "gs://{bucket}/templates/{template}".format(
bucket=BUCKET, template=TEMPLATE),
"parameters": {"timestamp": str(datetime.datetime.utcnow())},
"environment": {
"tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
"zone": "us-central1-f"
}
}
dfrequest = service.projects().templates().create(
projectId=PROJECT, body=BODY)
dfresponse = dfrequest.execute()
logging.info(dfresponse)
self.response.write('Done')
class FetchTweets(webapp2.RequestHandler):
"""Fetch home timeline tweets from the given twitter account."""
def get(self):
# set up the twitter client. These env vars are set in app.yaml.
consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
api = twitter.Api(consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_token,
access_token_secret=access_token_secret)
last_id = None
public_tweets = None
# see if we can get the id of the most recent tweet stored.
tweet_entities = ndb.gql('select * from Tweet order by tid desc limit 1')
last_id = None
for te in tweet_entities:
last_id = te.tid
break
if last_id:
logging.info("last id is: %s", last_id)
public_tweets = []
# grab tweets from the home timeline of the auth'd account.
try:
if last_id:
public_tweets = api.GetHomeTimeline(count=200, since_id=last_id)
else:
public_tweets = api.GetHomeTimeline(count=20)
logging.warning("Could not get last tweet id from datastore.")
except Exception as e:
logging.warning("Error getting tweets: %s", e)
# store the retrieved tweets in the datastore
logging.info("got %s tweets", len(public_tweets))
for tweet in public_tweets:
tw = Tweet()
# logging.info("text: %s, %s", tweet.text, tweet.user.screen_name)
tw.text = tweet.text
tw.user = tweet.user.screen_name
tw.created_at = datetime.datetime.strptime(
tweet.created_at, "%a %b %d %H:%M:%S +0000 %Y")
tw.tid = tweet.id
urls = tweet.urls
urllist = []
for u in urls:
urllist.append(u.expanded_url)
tw.urls = urllist
tw.key = ndb.Key(Tweet, tweet.id)
tw.put()
self.response.write('Done')
class MainPage(webapp2.RequestHandler):
def get(self):
self.response.write('nothing to see.')
app = webapp2.WSGIApplication(
[('/', MainPage), ('/timeline', FetchTweets),
('/launchtemplatejob', LaunchJob)],
debug=True)