-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongodb_queue.py
90 lines (78 loc) · 3.37 KB
/
mongodb_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime, timedelta
from pymongo import MongoClient, errors
class MogoQueue():
OUTSTANDING = 1 ##初始状态
PROCESSING = 2 ##正在下载状态
COMPLETE = 3 ##下载完成状态
def __init__(self, db, collection, timeout=300):##初始mongodb连接
self.client = MongoClient()
self.Client = self.client[db]
self.db = self.Client[collection]
self.timeout = timeout
def __bool__(self):
"""
这个函数,我的理解是如果下面的表达为真,则整个类为真
至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
$ne的意思是不匹配
"""
record = self.db.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False
def push(self, url, sha1): ##这个函数用来添加新的URL进队列
try:
self.db.insert({'_id': url, 'status': self.OUTSTANDING, 'sha1': sha1})
print(url, '插入队列成功')
except errors.DuplicateKeyError as e: ##报错则代表已经存在于队列之中了
record = self.db.find_one({'_id': url, 'sha1': sha1})
if record:
//文件内容没有改变过
print(url, '已经存在于队列中了')
else:
self.db.update({'_id': url}, {'$set': {'status': self.OUTSTANDING, 'timestamp': datetime.now(), 'sha1':sha1}})
print(url, '文件变更,需要重新下载')
pass
def pop(self):
"""
这个函数会查询队列中的所有状态为OUTSTANDING的值,
更改状态,(query后面是查询)(update后面是更新)
并返回_id(就是我们的URL),MongDB好使吧,^_^
如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
$set是设置的意思,和MySQL的set语法一个意思
"""
record = self.db.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError
def pop_sha1(self, url):
record = self.db.find_one({'_id': url})
return record['sha1']
def peek(self):
"""这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
record = self.db.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']
def complete(self, url):
"""这个函数是更新已完成的URL完成"""
self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def repair(self):
"""这个函数是重置状态$lt是比较"""
record = self.db.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print('重置URL状态', record['_id'])
def clear(self):
"""这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
self.db.drop()