11
11
import com .microsoft .azure .proton .transport .proxy .ProxyHandler ;
12
12
13
13
import java .nio .ByteBuffer ;
14
+
14
15
import java .util .Map ;
16
+ import java .util .concurrent .atomic .AtomicInteger ;
15
17
16
18
import org .apache .qpid .proton .engine .Transport ;
17
19
import org .apache .qpid .proton .engine .TransportException ;
22
24
import org .apache .qpid .proton .engine .impl .TransportWrapper ;
23
25
24
26
public class ProxyImpl implements Proxy , TransportLayer {
25
- private final int proxyHandshakeBufferSize = 4 * 1024 ; // buffers used only for proxy-handshake
27
+ private final int proxyHandshakeBufferSize = 8 * 1024 ; // buffers used only for proxy-handshake
26
28
private final ByteBuffer inputBuffer ;
27
29
private final ByteBuffer outputBuffer ;
28
30
@@ -36,6 +38,9 @@ public class ProxyImpl implements Proxy, TransportLayer {
36
38
37
39
private ProxyHandler proxyHandler ;
38
40
41
+ private final String PROXY_AUTH_DIGEST = "Proxy-Authenticate: Digest" ;
42
+ private final String PROXY_AUTH_BASIC = "Proxy-Authenticate: Basic" ;
43
+ private final AtomicInteger nonceCounter = new AtomicInteger (0 );
39
44
/**
40
45
* Create proxy transport layer - which, after configuring using
41
46
* the {@link #configure(String, Map, ProxyHandler, Transport)} API
@@ -167,9 +172,17 @@ public void process() throws TransportException {
167
172
final ProxyHandler .ProxyResponseResult responseResult = proxyHandler
168
173
.validateProxyResponse (inputBuffer );
169
174
inputBuffer .compact ();
170
-
175
+ inputBuffer . clear ();
171
176
if (responseResult .getIsSuccess ()) {
172
177
proxyState = ProxyState .PN_PROXY_CONNECTED ;
178
+ } else if (responseResult .getError () != null && responseResult .getError ().contains (PROXY_AUTH_DIGEST )) {
179
+ proxyState = ProxyState .PN_PROXY_CHALLENGE ;
180
+ DigestProxyChallengeProcessorImpl digestProxyChallengeProcessor = new DigestProxyChallengeProcessorImpl (host , responseResult .getError ());
181
+ headers = digestProxyChallengeProcessor .getHeader ();
182
+ } else if (responseResult .getError () != null && responseResult .getError ().contains (PROXY_AUTH_BASIC )) {
183
+ proxyState = ProxyState .PN_PROXY_CHALLENGE ;
184
+ BasicProxyChallengeProcessorImpl basicProxyChallengeProcessor = new BasicProxyChallengeProcessorImpl (host );
185
+ headers = basicProxyChallengeProcessor .getHeader ();
173
186
} else {
174
187
tailClosed = true ;
175
188
underlyingTransport .closed (
@@ -178,6 +191,22 @@ public void process() throws TransportException {
178
191
+ responseResult .getError ()));
179
192
}
180
193
break ;
194
+ case PN_PROXY_CHALLENGE_RESPONDED :
195
+ inputBuffer .flip ();
196
+ final ProxyHandler .ProxyResponseResult challengeResponseResult = proxyHandler
197
+ .validateProxyResponse (inputBuffer );
198
+ inputBuffer .compact ();
199
+
200
+ if (challengeResponseResult .getIsSuccess ()) {
201
+ proxyState = ProxyState .PN_PROXY_CONNECTED ;
202
+ } else {
203
+ tailClosed = true ;
204
+ underlyingTransport .closed (
205
+ new TransportException (
206
+ "proxy connect request failed with error: "
207
+ + challengeResponseResult .getError ()));
208
+ }
209
+ break ;
181
210
default :
182
211
underlyingInput .process ();
183
212
}
@@ -192,7 +221,6 @@ public void close_tail() {
192
221
if (getIsHandshakeInProgress ()) {
193
222
headClosed = true ;
194
223
}
195
-
196
224
underlyingInput .close_tail ();
197
225
}
198
226
@@ -215,15 +243,35 @@ public int pending() {
215
243
} else {
216
244
return outputBuffer .position ();
217
245
}
246
+ case PN_PROXY_CHALLENGE :
247
+ if (outputBuffer .position () == 0 ) {
248
+ proxyState = ProxyState .PN_PROXY_CHALLENGE_RESPONDED ;
249
+ writeProxyRequest ();
218
250
251
+ head .limit (outputBuffer .position ());
252
+ if (headClosed ) {
253
+ proxyState = ProxyState .PN_PROXY_FAILED ;
254
+ return Transport .END_OF_STREAM ;
255
+ } else {
256
+ return outputBuffer .position ();
257
+ }
258
+ } else {
259
+ return outputBuffer .position ();
260
+ }
261
+ case PN_PROXY_CHALLENGE_RESPONDED :
262
+ if (headClosed && (outputBuffer .position () == 0 )) {
263
+ proxyState = ProxyState .PN_PROXY_FAILED ;
264
+ return Transport .END_OF_STREAM ;
265
+ } else {
266
+ return outputBuffer .position ();
267
+ }
219
268
case PN_PROXY_CONNECTING :
220
269
if (headClosed && (outputBuffer .position () == 0 )) {
221
270
proxyState = ProxyState .PN_PROXY_FAILED ;
222
271
return Transport .END_OF_STREAM ;
223
272
} else {
224
273
return outputBuffer .position ();
225
274
}
226
-
227
275
default :
228
276
return Transport .END_OF_STREAM ;
229
277
}
@@ -238,6 +286,8 @@ public ByteBuffer head() {
238
286
switch (proxyState ) {
239
287
case PN_PROXY_CONNECTING :
240
288
return head ;
289
+ case PN_PROXY_CHALLENGE_RESPONDED :
290
+ return head ;
241
291
default :
242
292
return underlyingOutput .head ();
243
293
}
@@ -261,6 +311,17 @@ public void pop(int bytes) {
261
311
underlyingOutput .pop (bytes );
262
312
}
263
313
break ;
314
+ case PN_PROXY_CHALLENGE_RESPONDED :
315
+ if (outputBuffer .position () != 0 ) {
316
+ outputBuffer .flip ();
317
+ outputBuffer .position (bytes );
318
+ outputBuffer .compact ();
319
+ head .position (0 );
320
+ head .limit (outputBuffer .position ());
321
+ } else {
322
+ underlyingOutput .pop (bytes );
323
+ }
324
+ break ;
264
325
default :
265
326
underlyingOutput .pop (bytes );
266
327
}
@@ -274,5 +335,6 @@ public void close_head() {
274
335
headClosed = true ;
275
336
underlyingOutput .close_head ();
276
337
}
338
+
277
339
}
278
340
}
0 commit comments