-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue-function.js
69 lines (66 loc) · 1.96 KB
/
queue-function.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
const fli = require('fli-webtask');
const wt = require('webtask-tools');
const bodyParser = require('body-parser');
const mongodb = require('mongodb');
const mongoDbQueue = require('mongodb-queue');
const express = fli.npm.express;
const as = fli.npm.async;
const _ = fli.npm.lodash;
const responseHandler = fli.lib.responseHandler;
const app = express();
const router = express.Router();
const validateMiddleware = (req, res, next) => {
if(req.webtaskContext.secrets.token !== req.query.token) {
const errMsgToken = 'No token.';
responseHandler(errMsgToken, res);
return next(errMsgToken);
}
if(!req.params.qq) {
const errMsgQQ = 'No queue name provided.';
responseHandler(errMsgQQ, res);
return next(errMsgQQ);
}
return next();
};
const mongoDbQueueMiddleware = (req, res, next) => {
mongodb.MongoClient.connect(req.webtaskContext.secrets.mongo, function(err, db) {
req.queue = mongoDbQueue(db, req.params.qq, {
visibility: 1,
delay: 0,
maxRetries: 2,
deadQueue: mongoDbQueue(db, `${req.params.qq}-dead`)
});
next(err);
});
};
router
.all('/add/:msg?', function (req, res) {
var msg = _.get(req, 'body.msg') || _.get(req, 'params.msg');
if(!msg) {
return responseHandler('No msg provided.', res);
}
as.waterfall([
(next) => req.queue.add(msg, next)
],
(err, id) => responseHandler(err, res, {id:id}));
})
.get('/get', function (req, res) {
as.waterfall([
(next) => req.queue.get(next)
],
(err, msg)=> responseHandler(err, res, msg));
})
.get('/ack', function (req, res) {
as.waterfall([
(next) => req.queue.get(next),
(item, next) => {
!!_.get(item, 'ack') ? req.queue.ack(item.ack, next) : item('Queue is empty.');
},
(id, next) => req.queue.clean((err) => next(err, {id:id}))
],
(err, data)=> responseHandler(err, res, data));
});
app
.use(bodyParser.json())
.use('/:qq', validateMiddleware, mongoDbQueueMiddleware, router);
module.exports = wt.fromExpress(app);