-
Notifications
You must be signed in to change notification settings - Fork 1
/
barbora.py
140 lines (113 loc) · 4.23 KB
/
barbora.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from twilio.rest import Client
import requests
import os
import json
import re
import time
import sys
TWILIO_ACCOUNT_ID = os.environ.get("TWILIO_ACCOUNT_ID")
TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_ACCOUNT_AUTH_TOKEN")
NUMBER_TO_SEND_SMS = os.environ.get("TWILIO_NUMBER_TO_SEND")
TWILIO_OUTGOING_NUMBER = os.environ.get("TWILIO_FROM_NUMBER")
HOURS_TO_SLEEP = int(os.environ.get("HOURS_TO_SLEEP_AFTER_GETTING_SLOT", "6"))
COOKIE = os.environ["BARBORA_COOKIE"]
MS_TEAMS_WEBHOOK = os.environ.get("MS_TEAMS_WEBHOOK")
NOTIFICATIONS_TO_SEND = int(os.environ.get("NOTIFICATIONS_TO_SEND", "2"))
DRY_RUN = os.environ.get("DRY_RUN")
PUSH_BACK_SECONDS = int(os.environ.get("PUSH_BACK_BARBORA_API_IN_SECONDS", "30"))
sleep_long_as_informed = HOURS_TO_SLEEP * 60 * 60 * 1
if (
TWILIO_ACCOUNT_ID
and TWILIO_AUTH_TOKEN
and TWILIO_OUTGOING_NUMBER
and NUMBER_TO_SEND_SMS
):
twilio_client = Client(TWILIO_ACCOUNT_ID, TWILIO_AUTH_TOKEN)
print(f"Twilio Number to send SMS: {NUMBER_TO_SEND_SMS}")
print(f"Twilio Outgoing Number: {TWILIO_OUTGOING_NUMBER}")
else:
print(f"Twilio SMS Notifications disabled")
if MS_TEAMS_WEBHOOK:
print(f"MS Teams Notifications Enabled")
else:
print(f"MS Teams Notifications disabled")
BARBORA_URL = "https://www.barbora.lt/api/eshop/v1/cart/deliveries"
BARBORA_HEADERS = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.5",
"Authorization": "Basic YXBpa2V5OlNlY3JldEtleQ==",
"Connection": "keep-alive",
"Host": "www.barbora.lt",
"Referer": "https://www.barbora.lt/",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36",
}
BARBORA_HEADERS["cookie"] = COOKIE
def send_notifications():
if twilio_client:
twilio_client.messages.create(
to=NUMBER_TO_SEND_SMS,
from_=TWILIO_OUTGOING_NUMBER,
body=f"Barbora slot found at {today}",
)
if MS_TEAMS_WEBHOOK:
send_message_to_teams(
MS_TEAMS_WEBHOOK,
"There are slots, book now [barbora.lt](https://www.barbora.lt)",
)
def send_message_to_teams(wehook, message):
message = {
"@context": "http://schema.org/extensions",
"@type": "MessageCard",
"title": "Barbora Bot",
"text": message,
}
requests.post(url=wehook, json=message)
def get_delivery_data():
try:
r = requests.request("GET", BARBORA_URL, headers=BARBORA_HEADERS)
r.raise_for_status()
return r
except requests.exceptions.RequestException as err:
print("OOps: Something Else", err)
if r.status_code == 401:
print("No need to run, cookie expired - access denied")
if MS_TEAMS_WEBHOOK:
send_message_to_teams(
MS_TEAMS_WEBHOOK, "Failure: Coockie expired. Bot stopped working"
)
sys.exit(0)
except requests.exceptions.HTTPError as errh:
print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
print("Timeout Error:", errt)
return
NOTIFICATIONS_THROTTLE = NOTIFICATIONS_TO_SEND
while True:
if NOTIFICATIONS_THROTTLE == 0:
time.sleep(sleep_long_as_informed)
NOTIFICATIONS_THROTTLE = NOTIFICATIONS_TO_SEND
print(f"Going to sleep for {HOURS_TO_SLEEP} hours")
if MS_TEAMS_WEBHOOK:
send_message_to_teams(
MS_TEAMS_WEBHOOK, f"Going for sleep for {HOURS_TO_SLEEP} hours"
)
today = time.ctime()
if not DRY_RUN:
response = get_delivery_data()
try:
resp_str = json.dumps(response.json())
except Exception as e:
print(f"Was not able to parse response. Error: {e}")
continue
if DRY_RUN or re.search('"available": true', resp_str):
print(f"Slot found at {today}")
send_notifications()
NOTIFICATIONS_THROTTLE = NOTIFICATIONS_THROTTLE - 1
elif re.search('"title": null', resp_str):
print(f"Empty response returned at {today}")
else:
print(f"No slots at {today}")
time.sleep(PUSH_BACK_SECONDS)