forked from iMeiDo/asm_reborn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotify.py
178 lines (167 loc) · 5.75 KB
/
notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
# @Time : 2021/2/23 06:30
# @Author : srcrs
# @Email : [email protected]
import smtplib,traceback,os,requests,urllib,json
from email.mime.text import MIMEText
#返回要推送的通知内容
#对markdown的适配要更好
#增加文件关闭操作
def readFile(filepath):
content = ''
with open(filepath, encoding='utf-8') as f:
for line in f.readlines():
content += line + '\n\n'
return content
#返回要推送的通知内容
#对text的适配要更好
#增加文件关闭操作
def readFile_text(filepath):
content = ''
with open(filepath, encoding='utf-8') as f:
for line in f.readlines():
content += line
return content
#返回要推送的通知内容
#对html的适配要更好
#增加文件关闭操作
def readFile_html(filepath):
content = ''
with open(filepath, "r" , encoding='utf-8') as f:
for line in f.readlines():
content += line + '<br>'
return content
#邮件推送api来自流星云
#备用方案推送api来自BER
def sendEmail(email):
try:
#要发送邮件内容
content = readFile('./log.txt')
#接收方邮箱
receivers = email
#邮件主题
subject = 'UnicomTask每日报表'
param1 = '?address=' + receivers + '&name=' + subject + '&certno=' + content
param2 = '?to=' + receivers + '&title=' + subject + '&text=' + content
res1 = requests.get('http://liuxingw.com/api/mail/api.php' + param1)
res1.encoding = 'utf-8'
res1 = res1.json()
if res1['Code'] == '1':
print(res1['msg'])
else:
#备用推送
requests.get('https://email.berfen.com/api' + param2)
print('email push BER')
#这里不知道为什么,在很多情况下返回的不是 json,
# 但在测试过程中成功率极高,因此直接输出
except Exception as e:
print('邮件推送异常,原因为: ' + str(e))
print(traceback.format_exc())
#钉钉群自定义机器人推送
def sendDing(webhook):
try:
#要发送邮件内容
content = readFile('./log.txt')
data = {
'msgtype': 'markdown',
'markdown': {
'title': 'UnicomTask每日报表',
'text': content
}
}
headers = {
'Content-Type': 'application/json;charset=utf-8'
}
res = requests.post(webhook,headers=headers,json=data)
res.encoding = 'utf-8'
res = res.json()
print('dinngTalk push : ' + res['errmsg'])
except Exception as e:
print('钉钉机器人推送异常,原因为: ' + str(e))
print(traceback.format_exc())
#发送Tg通知
def sendTg(tgBot):
try:
token = tgBot['tgToken']
chat_id = tgBot['tgUserId']
#发送内容
content = readFile_text('./log.txt')
data = {
'UnicomTask每日报表':content
}
content = urllib.parse.urlencode(data)
#TG_BOT的token
#token = os.environ.get('TG_TOKEN')
#用户的ID
#chat_id = os.environ.get('TG_USERID')
url = f'https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={content}'
session = requests.Session()
resp = session.post(url)
print(resp)
except Exception as e:
print('Tg通知推送异常,原因为: ' + str(e))
print(traceback.format_exc())
#发送push+通知
def sendPushplus(token):
try:
#发送内容
data = {
"token": token,
"title": "UnicomTask每日报表",
"content": readFile_html('./log.txt')
}
url = 'http://www.pushplus.plus/send'
headers = {'Content-Type': 'application/json'}
body = json.dumps(data).encode(encoding='utf-8')
resp = requests.post(url, data=body, headers=headers)
print(resp)
except Exception as e:
print('push+通知推送异常,原因为: ' + str(e))
print(traceback.format_exc())
#企业微信通知,普通微信可接收
def sendWechat(wex):
#获得access_token
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
token_param = '?corpid=' + wex['id'] + '&corpsecret=' + wex['secret']
token_data = requests.get(url + token_param)
token_data.encoding = 'utf-8'
token_data = token_data.json()
access_token = token_data['access_token']
#发送内容
content = readFile_text('./log.txt')
#创建要发送的消息
data = {
"touser": "@all",
"msgtype": "text",
"agentid": wex['agentld'],
"text": {"content": content}
}
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
message = requests.post(send_url,json=data)
message.encoding = 'utf-8'
res = message.json()
print('Wechat send : ' + res['errmsg'])
#发送IFTTT通知
def sendIFTTT(ifttt):
try:
content = readFile('./log.txt')
body = { ifttt['subjectKey']: 'UnicomTask每日报表', ifttt['contentKey']: content }
url = 'https://maker.ifttt.com/trigger/{event_name}/with/key/{key}'.format(event_name=ifttt['eventName'], key=ifttt['apiKey'])
response = requests.post(url, json=body)
print(response)
except Exception as e:
print('IFTTT通知推送异常,原因为: ' + str(e))
print(traceback.format_exc())
#发送Bark通知
def sendBarkkey(Barkkey):
#发送内容
content = readFile_text('./log.txt')
data = {
'UnicomTask每日报表':content
}
content = urllib.parse.urlencode(data)
url = f'https://api.day.app/{Barkkey}/{content}'
session = requests.Session()
resp = session.post(url)
state=json.loads(resp.text)
print(state)