1
+ # -*- coding: utf-8 -*-
2
+
1
3
from asyncio import ensure_future , wait
2
4
from datetime import datetime , timedelta
3
5
from json import JSONDecodeError , dumps , loads
4
6
from typing import Optional , Tuple
5
7
6
- from torequests .utils import ttime
8
+ from torequests .utils import timeago , ttime
7
9
from uniparser import Crawler , RuleNotFoundError
8
10
9
11
from .config import Config
@@ -38,8 +40,7 @@ def kwargs(self):
38
40
39
41
40
42
def find_next_check_time (
41
- work_hours : str ,
42
- interval : int ,
43
+ task : Task ,
43
44
now : Optional [datetime ] = None ,
44
45
) -> Tuple [bool , datetime ]:
45
46
'''
@@ -63,30 +64,50 @@ def find_next_check_time(
63
64
%w==5|20, 24 means every Friday or everyday 20:00 ~ 23:59
64
65
%w==5|%w==2 means every Friday or Tuesday
65
66
%w!=6&%w!=0 means everyday except Saturday & Sunday.
67
+ 5. Set a ensure change interval
68
+ > If work_hours string endswith `#` and `x` seconds, will check the next_change_time first.
69
+ > In other words, I am very sure that the interval between two changes is more than `x` seconds
70
+ > So the crawler of this task will not run until the time is `last_change_time + change_interval`
71
+ %w==5#86400 means every Friday if it didn't change within 1 day
72
+ 0, 24#3600 means each hour if it didn't change within this hour. The task will only be crawled once if it has changed.
66
73
'''
67
74
# find the latest hour fit work_hours, if not exist, return next day 00:00
68
75
now = now or datetime .now ()
76
+ work_hours = task .work_hours or '0, 24'
77
+ if '#' in work_hours :
78
+ # check if changed
79
+ last_change_time = task .last_change_time or datetime .fromtimestamp (0 )
80
+ # split work_hours and change_interval
81
+ work_hours , change_interval_str = work_hours .split ('#' )
82
+ change_interval = int (change_interval_str )
83
+ # not fit change interval, will wait for left seconds.
84
+ next_change_time = last_change_time + timedelta (seconds = change_interval )
85
+ if now < next_change_time :
86
+ Config .logger .info (
87
+ f'Task [{ task .name } ] has changed in { timeago (change_interval , accuracy = 1 , format = 1 , short_name = 1 )} ago.'
88
+ )
89
+ return False , next_change_time
69
90
70
- ok = check_work_time (work_hours , now )
71
- if ok :
72
- # current time is ok , next_check_time is now+interval
73
- next_check_time = now + timedelta (seconds = interval )
74
- return ok , next_check_time
91
+ need_crawl = check_work_time (work_hours , now )
92
+ if need_crawl :
93
+ # current time is need_crawl , next_check_time is now+interval
94
+ next_check_time = now + timedelta (seconds = task . interval )
95
+ return need_crawl , next_check_time
75
96
else :
76
- # current time is not ok
97
+ # current time is not need_crawl
77
98
next_check_time = now
78
- # time machine to check time fast
99
+ # time machine to update next_check_time fast
79
100
for _ in range (60 ):
80
- # check next interval
81
- next_check_time = next_check_time + timedelta (seconds = interval )
82
- _ok = check_work_time (work_hours , next_check_time )
83
- if _ok :
84
- # current is still False, but next_check_time is True
101
+ # next interval
102
+ next_check_time = next_check_time + timedelta (seconds = task . interval )
103
+ _need_crawl = check_work_time (work_hours , next_check_time )
104
+ if _need_crawl :
105
+ # current time is still False, but next_check_time is True
85
106
break
86
- return ok , next_check_time
107
+ return need_crawl , next_check_time
87
108
88
109
89
- async def crawl (task ):
110
+ async def crawl (task : Task ):
90
111
crawler : Crawler = Config .crawler
91
112
logger = Config .logger
92
113
logger .info (f'Start crawling: { task .name } ' )
@@ -103,13 +124,17 @@ async def crawl(task):
103
124
else :
104
125
if len (crawl_result ) == 1 :
105
126
# chain result for __request__ which fetch a new request
106
- result_list = get_watchdog_result (item = crawl_result .popitem ()[1 ])
107
- if result_list == {'text' : 'text not found' }:
127
+ formated_result = get_watchdog_result (
128
+ item = crawl_result .popitem ()[1 ])
129
+ if formated_result == {'text' : 'text not found' }:
108
130
error = f'{ task .name } text not found, crawl result given: { crawl_result } '
109
131
logger .error (error )
132
+ result_list = None
110
133
else :
111
- if not isinstance (result_list , list ):
112
- result_list = [result_list ]
134
+ if isinstance (formated_result , list ):
135
+ result_list = formated_result
136
+ else :
137
+ result_list = [formated_result ]
113
138
# use force crawl one web UI for more log
114
139
logger .info (f'{ task .name } Crawl success: { result_list } ' [:150 ])
115
140
else :
@@ -141,12 +166,11 @@ async def _crawl_once(task_name: Optional[str] = None, chunk_size: int = 20):
141
166
for _task in fetched_tasks :
142
167
task = Task (** dict (_task ))
143
168
# check work hours
144
- ok , next_check_time = find_next_check_time (task .work_hours or '0, 24' ,
145
- task .interval , now )
169
+ need_crawl , next_check_time = find_next_check_time (task , now )
146
170
if task_name :
147
171
# always crawl for given task_name
148
- ok = True
149
- if ok :
172
+ need_crawl = True
173
+ if need_crawl :
150
174
t = ensure_future (crawl (task ))
151
175
# add task_name for logger
152
176
setattr (t , 'task_name' , task .name )
@@ -160,7 +184,7 @@ async def _crawl_once(task_name: Optional[str] = None, chunk_size: int = 20):
160
184
# update task variable for callback
161
185
task .__dict__ .update (values )
162
186
update_values .append (values )
163
- if not ok :
187
+ if not need_crawl :
164
188
logger .info (
165
189
f'Task [{ task .name } ] is not on work time, next_check_time reset to { next_check_time } '
166
190
)
0 commit comments