-
Notifications
You must be signed in to change notification settings - Fork 0
/
responsewaiter.cpp
181 lines (159 loc) · 4.62 KB
/
responsewaiter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/* Copyright (C) 2009 Frederik M.J.V
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
* LGPL is available on the internet at
* http://www.gnu.org/licenses/lgpl-2.1.html and from Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
#include "responsewaiterprivate.h"
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
/**
* <p>
* This class implements a way to make non blocking Rpc calls blocking. It may
* be used like this:
* </p>
*
* <pre>
* ResponseWaiter<E> waiter = new ResponseWaiter<E>(breakableRpcChannel);
* SimpleRpcController cont=new SimpleRpcController();
* service.callMethod(cont, request, waiter.getCallback());
* try {
* E response = waiter.await();
* waiter.cleanup();
*
* //handle response
*
* } catch (InterruptedException e) {
* //handle exception
* } catch (TimeoutException e) {
* //handle exception
* }
* waiter.reset(rpcChannel);//if you want to use it again
* </pre>
*
* @author Frederik
*
* @param <E>
* - the callback type for the Rpc call
*/
namespace protorpc{
/**
* @param ch Channel that should notify the waiter if it is broken
* @param co Controller that should notify the waiter if the call
* is canceled or fails
*/
ResponseWaiter::ResponseWaiter(QObject *ch, QObject *ctrl,google::protobuf::Message *resp) {
child=new ResponseWaiterPrivate(this,resp);
al= new QMutex(QMutex::Recursive);
wl= new QMutex();
listen(ch, ctrl,resp);
}
ResponseWaiter::~ResponseWaiter(){
delete child;
delete al;
delete wl;
}
/**
* Wait for the method to return
*
* @param timeout
* - the time in TimeUnit before returning timeout exception
* @param unit
* - the time unit the timeout is specified in
* @return Response
* @throws InterruptedException
* if the thread is interrupted
* @throws TimeoutException
* if the waiting timed out
*/
google::protobuf::Message *ResponseWaiter::wait(long timeout){
QMutexLocker wlckr(wl);
QMutexLocker alckr(al);
if (responded)
return cbr;
if (timeout == 0)
wc.wait(wl);
else{
if(!wc.wait(wl,timeout)){//timed out
return NULL;
}
}
if (responded)
return cbr;
else
return NULL;
}
/**
* Reset the waiter to make it wait for new responses
*
* @param newchan
* if null, or doesnt implement BreakableRpcChannel the waiter
* will wait infinitly if the channel is broken and it doesn't
* timeout
*/
void ResponseWaiter::reset(QObject *newchan,QObject *newco,google::protobuf::Message* resp) {
if (al->tryLock()) {
cbr = NULL;
responded = false;
cleanup();
listen(newchan,newco,resp);
al->unlock();
} else {
throw "The response is allready waiting on something";
}
}
/**
* Clean up the waiter after use and remove the pointer to the channel
*/
void ResponseWaiter::cleanup() {
if (al->tryLock()) {
child->cleanup();
al->unlock();
} else {
throw "The response is allready waiting on something";
}
}
void ResponseWaiter::listen(QObject *chan, QObject *ctrl,google::protobuf::Message* resp) {
if (al->tryLock()) {
child->connectSignals(chan,ctrl);
child->changeResponse(resp);
cbr=resp;
responded=false;
al->unlock();
} else {
throw "The response is allready waiting on something";
}
}
google::protobuf::Closure *ResponseWaiter::getClosure(){
return child->getClosure();
}
void ResponseWaiter::channelBroken(google::protobuf::RpcChannel *b) {
QMutexLocker wlcker(wl);
cbr = NULL;
responded = true;
wc.wakeAll();
}
void ResponseWaiter::callback(google::protobuf::Message *param) {
QMutexLocker wlcker(wl);
cbr=param;
responded = true;
wc.wakeAll();
}
void ResponseWaiter::methodCanceled(google::protobuf::RpcController *ctrl){
QMutexLocker wlcker(wl);
cbr = NULL;
responded = true;
wc.wakeAll();
}
void ResponseWaiter::methodFailed(google::protobuf::RpcController *ctrl,std::string reason){
QMutexLocker wlcker(wl);
cbr = NULL;
responded = true;
wc.wakeAll();
}
};