-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththreadsaver_bot.py
84 lines (67 loc) · 2.58 KB
/
threadsaver_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import tweepy
import logging
import os
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def create_api():
consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
try:
api.verify_credentials()
print('verified')
except Exception as e:
logger.error("Error creating API", exc_info=True)
raise e
logger.info("API created")
return api
def direct_message(api, user_name, id, url):
message_text = f"Your saved thread {url}"
print(user_name, url)
user = api.get_user(id)
user_id = user.id_str
api.send_direct_message(user_id, message_text)
print(f"Successfully sent to {user.name}")
def getRootTweet(api, tweet):
if tweet is None:
print("fatal error: Tweet null")
logger.info(f"{tweet.user.name},{tweet.id}")
return tweet.user.screen_name, tweet.id
if tweet.in_reply_to_status_id is None:
print("Got root!")
logger.info(f"{tweet.user.name},{tweet.id}")
return tweet.user.screen_name, tweet.id
parent = api.get_status(tweet.in_reply_to_status_id, tweet_mode="extended")
return getRootTweet(api, parent)
def retrieve_userthread(api):
keywords = ['save', 'shot']
logger.info("Retrieving mentions")
new_since_id = since_id
for tweet in tweepy.Cursor(api.mentions_timeline, since_id=since_id).items():
logger.info(tweet.user.screen_name)
logger.info(f"Since ID:{since_id}")
new_since_id = max(tweet.id_str, new_since_id)
logger.info(f"new Since ID:{new_since_id}")
# if tweet.in_reply_to_status_id is not None:
# continue
if any(keyword in tweet.text.lower() for keyword in keywords):
tag_user = tweet.user.name
tag_id = tweet.user.id
logger.info(f"Finding parent of {tag_user}")
parent_username, parent_id = getRootTweet(api, tweet)
thread_url = f"https://twitter.com/{parent_username}/status/{parent_id}"
direct_message(api, tag_user, tag_id, thread_url)
return new_since_id
api = create_api()
since_id = os.environ['id']
while True:
since_id = retrieve_userthread(api)
print("break")
logger.info("Waiting...")
time.sleep(10)