@@ -298,48 +298,90 @@ impl StoreDriver for RedisStore {
298
298
// As a result of this, there will be a span of time where a key in Redis has only partial data. We want other
299
299
// observers to notice atomic updates to keys, rather than partial updates, so we first write to a temporary key
300
300
// and then rename that key once we're done appending data.
301
- //
302
- // TODO(caass): Remove potential for infinite loop (https://reviewable.io/reviews/TraceMachina/nativelink/1188#-O2pu9LV5ux4ILuT6MND)
303
- ' outer: loop {
304
- let mut expecting_first_chunk = true ;
305
- let pipe = client. pipeline ( ) ;
306
-
307
- while expecting_first_chunk || !reader. is_empty ( ) {
308
- let chunk = reader
309
- . recv ( )
310
- . await
311
- . err_tip ( || "Failed to reach chunk in update in redis store" ) ?;
301
+ let mut is_first_chunk = true ;
302
+ let mut eof_reached = false ;
303
+ let mut pipe = client. pipeline ( ) ;
312
304
313
- if chunk . is_empty ( ) {
314
- if is_zero_digest ( key . borrow ( ) ) {
315
- return Ok ( ( ) ) ;
316
- }
305
+ while !eof_reached {
306
+ pipe = client . pipeline ( ) ;
307
+ let mut pipe_size = 0 ;
308
+ const MAX_PIPE_SIZE : usize = 5 * 1024 * 1024 ; // 5 MB
317
309
318
- // Reader sent empty chunk, we're done here.
319
- break ' outer;
320
- }
310
+ let chunk = reader
311
+ . recv ( )
312
+ . await
313
+ . err_tip ( || "Failed to reach chunk in update in redis store" ) ?;
314
+
315
+ if chunk. is_empty ( ) {
316
+ // There are three cases where we receive an empty chunk:
317
+ // 1. The first chunk of a zero-digest key. We're required to treat all zero-digest keys as if they exist
318
+ // and are empty per the RBE spec, so we can just return early as if we've pushed it -- any attempts to
319
+ // read the value later will similarly avoid the network trip.
320
+ // 2. This is an empty first chunk of a non-zero-digest key. In this case, we _do_ need to push up an
321
+ // empty key, but can skip the rest of the process around renaming since there's only the one operation.
322
+ // 3. This is the last chunk (EOF) of a regular key. In that case we can skip pushing this chunk.
323
+ //
324
+ // In all three cases, we're done pushing data and can move it from the temporary key to the final key.
325
+ if is_first_chunk && is_zero_digest ( key. borrow ( ) ) {
326
+ // Case 1, a zero-digest key.
327
+ return Ok ( ( ) ) ;
328
+ } else if is_first_chunk {
329
+ // Case 2, an empty non-zero-digest key.
330
+ pipe. append :: < ( ) , _ , _ > ( & temp_key, "" )
331
+ . await
332
+ . err_tip ( || "While appending to temp key in RedisStore::update" ) ?;
333
+ } ;
321
334
322
- // Queue the append, but don't execute until we've received all the chunks.
323
- pipe. append :: < ( ) , _ , _ > ( & temp_key, chunk)
324
- . await
325
- . err_tip ( || "Failed to append to temp key in RedisStore::update" ) ?;
326
- expecting_first_chunk = false ;
335
+ // Note: setting `eof_reached = true` and calling `continue` is semantically equivalent to `break`.
336
+ // Since we need to use the `eof_reached` flag in the inner loop, we do the same here
337
+ // for consistency.
338
+ eof_reached = true ;
339
+ continue ;
340
+ } else {
341
+ // Not EOF, but we've now received our first chunk.
342
+ is_first_chunk = false ;
343
+ }
327
344
328
- // Give other tasks a chance to run to populate the reader's
329
- // buffer if possible.
330
- tokio:: task:: yield_now ( ) . await ;
345
+ // Queue the append, but don't execute until we've received all the chunks.
346
+ pipe_size += chunk. len ( ) ;
347
+ pipe. append :: < ( ) , _ , _ > ( & temp_key, chunk)
348
+ . await
349
+ . err_tip ( || "Failed to append to temp key in RedisStore::update" ) ?;
350
+
351
+ // Opportunistically grab any other chunks already in the reader.
352
+ while let Some ( chunk) = reader
353
+ . try_recv ( )
354
+ . transpose ( )
355
+ . err_tip ( || "Failed to reach chunk in update in redis store" ) ?
356
+ {
357
+ if chunk. is_empty ( ) {
358
+ eof_reached = true ;
359
+ break ;
360
+ } else {
361
+ pipe_size += chunk. len ( ) ;
362
+ pipe. append :: < ( ) , _ , _ > ( & temp_key, chunk)
363
+ . await
364
+ . err_tip ( || "Failed to append to temp key in RedisStore::update" ) ?;
365
+ }
366
+
367
+ // Stop appending if the pipeline is already holding 5MB of data.
368
+ if pipe_size >= MAX_PIPE_SIZE {
369
+ break ;
370
+ }
331
371
}
332
372
333
- // Here the reader is empty but more data is expected.
373
+ // We've exhausted the reader (or hit the 5MB cap), but more data is expected.
334
374
// Executing the queued commands appends the data we just received to the temp key.
335
375
pipe. all :: < ( ) > ( )
336
376
. await
337
377
. err_tip ( || "Failed to append to temporary key in RedisStore::update" ) ?;
338
378
}
339
379
340
380
// Rename the temp key so that the data appears under the real key. Any data already present in the real key is lost.
341
- client
342
- . rename :: < ( ) , _ , _ > ( & temp_key, final_key. as_ref ( ) )
381
+ pipe. rename :: < ( ) , _ , _ > ( & temp_key, final_key. as_ref ( ) )
382
+ . await
383
+ . err_tip ( || "While queueing key rename in RedisStore::update()" ) ?;
384
+ pipe. all :: < ( ) > ( )
343
385
. await
344
386
. err_tip ( || "While renaming key in RedisStore::update()" ) ?;
345
387
0 commit comments