Skip to content
This repository was archived by the owner on Sep 26, 2022. It is now read-only.

Commit 05bc1ba

Browse files
committed
重构前的最后commits
1 parent 81d1e88 commit 05bc1ba

File tree

4 files changed

+172
-72
lines changed

4 files changed

+172
-72
lines changed

APIKey.py

-1
This file was deleted.

LiveStreams.py

+131-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,131 @@
1-
streamID={}
2-
3-
def manualAdd(name,id):
4-
streamID[name]=id
5-
6-
def manualDel(string):
7-
if string in streamID:
8-
streamID.pop(string)
9-
else:
10-
for name,id in streamID.items():
11-
if string==id:
12-
streamID.pop(name)
1+
import subprocess
2+
import threading
3+
import logging
4+
import sys
5+
from time import sleep
6+
import requests
7+
import main
8+
import APIKey
9+
10+
#streamID = {}
11+
#streaming = {}
12+
streamers = {}
13+
rbcThreads = {}
14+
threadLock = threading.Lock()
15+
16+
17+
class StreamerManager (threading.Thread):
18+
def __init__(self):
19+
threading.Thread.__init__(self)
20+
21+
def run(self):
22+
logging.debug('启动直播流管理')
23+
while True:
24+
for name, streamer in list(streamers.items):
25+
if streamer.check():
26+
pass
27+
else:
28+
pass
29+
30+
def Add(self, name, id):
31+
threadLock.acquire()
32+
streamers[name] = Streamer(name, id)
33+
threadLock.release()
34+
print('成功添加'+name+' '+id)
35+
36+
def Del(self, string):
37+
threadLock.acquire()
38+
if string in streamers:
39+
streamers.pop(string)
40+
threadLock.release()
41+
print('成功删除'+string)
42+
else:
43+
for name, streamer in list(streamers.items()):
44+
if streamer.id == string:
45+
streamers.pop(name)
46+
print('成功删除'+string)
47+
threadLock.release()
48+
return
49+
threadLock.release()
50+
print('找不到'+string)
51+
52+
53+
class Streamer(threading.Thread):
54+
def __init__(self, name, id):
55+
threading.Thread.__init__(self)
56+
self.name = name
57+
self.id = id
58+
self.state = False
59+
60+
def run(self):
61+
while True:
62+
tempState = self.check()
63+
if self.state!='开始' and tempState=="开始":
64+
self.state = tempState
65+
sleep(15)
66+
67+
def check(self):
68+
self.log('直播状态检测')
69+
return getyoutubevideostatus(self.id)
70+
71+
def start(self):
72+
pass
73+
def stop(self):
74+
pass
75+
76+
def log(self, string):
77+
logging.debug('[%s,%s]%s' % (self.name, self.id, string))
78+
79+
# 此处参考:https://github.com/lovezzzxxx/livemonitor/blob/73e616cd4fc3972701cb57d0cd56b75bca0dbb57/spider.py#L1417
80+
81+
82+
def getyoutubevideostatus(video_id):
83+
try:
84+
url = 'https://www.youtube.com/heartbeat?video_id=%s' % video_id
85+
response = requests.get(url, stream=True, timeout=(3, 7))
86+
if response.status_code == 200:
87+
try:
88+
if response.json()["stop_heartbeat"] == "1":
89+
video_status = "上传"
90+
return video_status
91+
else:
92+
# 测试中stop_heartbeat只在类型为视频的情况下出现且值为1
93+
return False
94+
except:
95+
if response.json()["status"] == "stop":
96+
video_status = "删除"
97+
elif response.json()["status"] == "ok":
98+
video_status = "开始"
99+
elif "liveStreamability" not in response.json() or "displayEndscreen" in response.json()["liveStreamability"]["liveStreamabilityRenderer"]:
100+
video_status = "结束"
101+
else:
102+
video_status = "等待"
103+
# 不可能为空 不可以为空
104+
return video_status
105+
else:
106+
return False
107+
except:
108+
return False
109+
110+
111+
# def checkLiveV3Api(): # 已弃用
112+
# for name, id in list(streamID.items()):
113+
# #id = LiveStreams.streamID[name]
114+
# r = requests.get(
115+
# "https://www.googleapis.com/youtube/v3/videos?part=snippet&id=%s&key=%s" % (id, APIKey.key))
116+
# j = r.json()
117+
# logging.debug('[%s,%s]liveBroadcastContent:%s' %
118+
# (name, id, j['items'][0]['snippet']['liveBroadcastContent']))
119+
# if j['items'][0]['snippet']['liveBroadcastContent'] == 'live':
120+
# if name in rbcThreads:
121+
# logging.debug('[%s,%s]already start' % (name, id))
122+
# else:
123+
# rbcThreads[name] = main.RebroadcastThread(
124+
# len(rbcThreads), name, id)
125+
# rbcThreads[name].start()
126+
# elif j['items'][0]['snippet']['liveBroadcastContent'] == 'none':
127+
# logging.debug('[%s,%s]not a live stream' % (name, id))
128+
# streamID.pop(name)
129+
# else:
130+
# logging.error('[%s,%s]err:liveBroadcastContent' % (name, id))
131+
# sleep(2)

Rebroadcast.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import threading
2+
3+
TemplateArg = r'streamlink --hls-live-edge 1 "https://www.youtube.com/watch?v={}" "best" -O | ffmpeg -re -i pipe:0 -c copy -f flv rtmp://localhost/flv/{}'
4+
workingdir = r'/web/hls'
5+
6+
class RebroadcastThread (threading.Thread):
7+
def __init__(self, threadID, name, id):
8+
threading.Thread.__init__(self)
9+
self.threadID = threadID
10+
self.name = name
11+
self.id = id
12+
13+
def run(self):
14+
arg = TemplateArg.format(self.id, self.name)
15+
logging.debug(self.id+":开始转播")
16+
pp = subprocess.Popen(
17+
arg, shell=True, encoding='utf-8', cwd=workingdir)
18+
while True:
19+
if not self.name in LiveStreams.streamID:
20+
pp.terminate()
21+
break
22+
if pp.returncode == None:
23+
continue
24+
elif pp.returncode == 0:
25+
logging.debug(self.id+':转播完成')
26+
break
27+
else:
28+
logging.error("err:startRebroadcast")
29+
break
30+
# rbcThreads.pop(self.name)

main.py

+11-59
Original file line numberDiff line numberDiff line change
@@ -5,70 +5,22 @@
55
import subprocess
66
import threading
77
import logging
8-
9-
TemplateArg = r'streamlink --hls-live-edge 1 "https://www.youtube.com/watch?v={}" "best" -O | ffmpeg -re -i pipe:0 -c copy -f flv rtmp://localhost/flv/{}'
10-
workingdir = r'/web/hls'
11-
rbcThreads = {}
12-
13-
14-
def checkLive():
15-
for name in LiveStreams.streamID:
16-
id = LiveStreams.streamID[name]
17-
r = requests.get(
18-
"https://www.googleapis.com/youtube/v3/videos?part=snippet&id=%s&key=%s" % (id, APIKey.key))
19-
j = r.json()
20-
logging.debug('[%s,%s]liveBroadcastContent:%s' %
21-
(name, id, j['items'][0]['snippet']['liveBroadcastContent']))
22-
if j['items'][0]['snippet']['liveBroadcastContent'] == 'live':
23-
if name in rbcThreads:
24-
logging.debug('[%s,%s]already start' % (name, id))
25-
else:
26-
rbcThreads[name] = RebroadcastThread(len(rbcThreads), name, id)
27-
rbcThreads[name].start()
28-
elif j['items'][0]['snippet']['liveBroadcastContent'] == 'none':
29-
logging.debug('[%s,%s]not a live stream' % (name, id))
30-
LiveStreams.streamID.pop(name)
31-
else:
32-
logging.error('[%s,%s]err:liveBroadcastContent' % (name, id))
33-
sleep(1)
34-
35-
36-
class CheckLiveThread (threading.Thread):
37-
def __init__(self):
38-
threading.Thread.__init__(self)
39-
40-
def run(self):
41-
logging.debug('启动直播检测')
42-
while True:
43-
checkLive()
44-
sleep(1)
45-
46-
47-
class RebroadcastThread (threading.Thread):
48-
def __init__(self, threadID, name, id):
49-
threading.Thread.__init__(self)
50-
self.threadID = threadID
51-
self.name = name
52-
self.id = id
53-
54-
def run(self):
55-
arg = TemplateArg.format(self.id, self.name)
56-
logging.debug(self.id+":开始转播")
57-
ret = subprocess.run(arg, shell=True, encoding='utf-8', cwd=workingdir)
58-
if ret.returncode == 0:
59-
logging.debug(self.id+':转播完成')
60-
else:
61-
logging.error("err:startRebroadcast")
62-
# rbcThreads.pop(self.name)
63-
8+
import sys
649

6510
if __name__ == "__main__":
6611
logging.basicConfig(
6712
format='%(asctime)s[%(levelname)s]%(threadName)s>%(message)s', level=logging.DEBUG)
68-
checkthread = CheckLiveThread().start()
13+
manager = LiveStreams.StreamerManager()
14+
manager.start()
6915
while True:
7016
cmd = input('rbc>').split(" ")
71-
if cmd[0] == 'add':
17+
if cmd[0]=='':
18+
continue
19+
elif cmd[0] == 'add':
7220
LiveStreams.manualAdd(cmd[1], cmd[2])
21+
elif cmd[0] == 'del':
22+
LiveStreams.manualDel(cmd[1])
23+
elif cmd[0]=='quit' or cmd[0]=='exit' or cmd[0]=='q':
24+
sys.exit()
7325
else:
74-
pass
26+
print('未知的控制台命令')

0 commit comments

Comments
 (0)