@@ -220,16 +220,42 @@ public int read(byte[] into, int off, int len) throws IOException {
220
220
221
221
public class ReadAheadRemoteFileInputStream
222
222
extends InputStream {
223
+ private class UnconfirmedRead {
224
+ private final long offset ;
225
+ private final Promise <Response , SFTPException > promise ;
226
+ private final int length ;
227
+
228
+ private UnconfirmedRead (long offset , int length , Promise <Response , SFTPException > promise ) {
229
+ this .offset = offset ;
230
+ this .length = length ;
231
+ this .promise = promise ;
232
+ }
233
+
234
+ UnconfirmedRead (long offset , int length ) throws IOException {
235
+ this (offset , length , RemoteFile .this .asyncRead (offset , length ));
236
+ }
237
+
238
+ public long getOffset () {
239
+ return offset ;
240
+ }
241
+
242
+ public Promise <Response , SFTPException > getPromise () {
243
+ return promise ;
244
+ }
245
+
246
+ public int getLength () {
247
+ return length ;
248
+ }
249
+ }
223
250
224
251
private final byte [] b = new byte [1 ];
225
252
226
253
private final int maxUnconfirmedReads ;
227
254
private final long readAheadLimit ;
228
- private final Queue <Promise <Response , SFTPException >> unconfirmedReads = new LinkedList <Promise <Response , SFTPException >>();
229
- private final Queue <Long > unconfirmedReadOffsets = new LinkedList <Long >();
255
+ private final Queue <UnconfirmedRead > unconfirmedReads = new LinkedList <>();
230
256
231
- private long requestOffset ;
232
- private long responseOffset ;
257
+ private long currentOffset ;
258
+ private int maxReadLength = Integer . MAX_VALUE ;
233
259
private boolean eof ;
234
260
235
261
public ReadAheadRemoteFileInputStream (int maxUnconfirmedReads ) {
@@ -247,28 +273,42 @@ public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset,
247
273
assert 0 <= fileOffset ;
248
274
249
275
this .maxUnconfirmedReads = maxUnconfirmedReads ;
250
- this .requestOffset = this . responseOffset = fileOffset ;
276
+ this .currentOffset = fileOffset ;
251
277
this .readAheadLimit = readAheadLimit > 0 ? fileOffset + readAheadLimit : Long .MAX_VALUE ;
252
278
}
253
279
254
280
private ByteArrayInputStream pending = new ByteArrayInputStream (new byte [0 ]);
255
281
256
282
private boolean retrieveUnconfirmedRead (boolean blocking ) throws IOException {
257
- if (unconfirmedReads .size () <= 0 ) {
283
+ final UnconfirmedRead unconfirmedRead = unconfirmedReads .peek ();
284
+ if (unconfirmedRead == null || !blocking && !unconfirmedRead .getPromise ().isDelivered ()) {
258
285
return false ;
259
286
}
287
+ unconfirmedReads .remove (unconfirmedRead );
260
288
261
- if (!blocking && !unconfirmedReads .peek ().isDelivered ()) {
262
- return false ;
263
- }
264
-
265
- unconfirmedReadOffsets .remove ();
266
- final Response res = unconfirmedReads .remove ().retrieve (requester .getTimeoutMs (), TimeUnit .MILLISECONDS );
289
+ final Response res = unconfirmedRead .promise .retrieve (requester .getTimeoutMs (), TimeUnit .MILLISECONDS );
267
290
switch (res .getType ()) {
268
291
case DATA :
269
292
int recvLen = res .readUInt32AsInt ();
270
- responseOffset += recvLen ;
271
- pending = new ByteArrayInputStream (res .array (), res .rpos (), recvLen );
293
+ if (unconfirmedRead .offset == currentOffset ) {
294
+ currentOffset += recvLen ;
295
+ pending = new ByteArrayInputStream (res .array (), res .rpos (), recvLen );
296
+
297
+ if (recvLen < unconfirmedRead .length ) {
298
+ // The server returned a packet smaller than the client had requested.
299
+ // It can be caused by at least one of the following:
300
+ // * The file has been read fully. Then, few futile read requests can be sent during
301
+ // the next read(), but the file will be downloaded correctly anyway.
302
+ // * The server shapes the request length. Then, the read window will be adjusted,
303
+ // and all further read-ahead requests won't be shaped.
304
+ // * The file on the server is not a regular file, it is something like fifo.
305
+ // Then, the window will shrink, and the client will start reading the file slower than it
306
+ // hypothetically can. It must be a rare case, and it is not worth implementing a sort of
307
+ // congestion control algorithm here.
308
+ maxReadLength = recvLen ;
309
+ unconfirmedReads .clear ();
310
+ }
311
+ }
272
312
break ;
273
313
274
314
case STATUS :
@@ -296,49 +336,24 @@ public int read(byte[] into, int off, int len) throws IOException {
296
336
// we also need to go here for len <= 0, because pending may be at
297
337
// EOF in which case it would return -1 instead of 0
298
338
339
+ long requestOffset = currentOffset ;
299
340
while (unconfirmedReads .size () <= maxUnconfirmedReads ) {
300
341
// Send read requests as long as there is no EOF and we have not reached the maximum parallelism
301
- int reqLen = Math .max (1024 , len ); // don't be shy!
342
+ int reqLen = Math .min ( Math . max (1024 , len ), maxReadLength );
302
343
if (readAheadLimit > requestOffset ) {
303
344
long remaining = readAheadLimit - requestOffset ;
304
345
if (reqLen > remaining ) {
305
346
reqLen = (int ) remaining ;
306
347
}
307
348
}
308
- unconfirmedReads .add (RemoteFile .this .asyncRead (requestOffset , reqLen ));
309
- unconfirmedReadOffsets .add (requestOffset );
349
+ unconfirmedReads .add (new UnconfirmedRead (requestOffset , reqLen ));
310
350
requestOffset += reqLen ;
311
351
if (requestOffset >= readAheadLimit ) {
312
352
break ;
313
353
}
314
354
}
315
355
316
- long nextOffset = unconfirmedReadOffsets .peek ();
317
- if (responseOffset != nextOffset ) {
318
-
319
- // the server could not give us all the data we needed, so
320
- // we try to fill the gap synchronously
321
-
322
- assert responseOffset < nextOffset ;
323
- assert 0 < (nextOffset - responseOffset );
324
- assert (nextOffset - responseOffset ) <= Integer .MAX_VALUE ;
325
-
326
- byte [] buf = new byte [(int ) (nextOffset - responseOffset )];
327
- int recvLen = RemoteFile .this .read (responseOffset , buf , 0 , buf .length );
328
-
329
- if (recvLen < 0 ) {
330
- eof = true ;
331
- return -1 ;
332
- }
333
-
334
- if (0 == recvLen ) {
335
- // avoid infinite loops
336
- throw new SFTPException ("Unexpected response size (0), bailing out" );
337
- }
338
-
339
- responseOffset += recvLen ;
340
- pending = new ByteArrayInputStream (buf , 0 , recvLen );
341
- } else if (!retrieveUnconfirmedRead (true /*blocking*/ )) {
356
+ if (!retrieveUnconfirmedRead (true /*blocking*/ )) {
342
357
343
358
// this may happen if we change prefetch strategy
344
359
// currently, we should never get here...
0 commit comments