Skip to content

Conversation

hoytak
Copy link
Collaborator

@hoytak hoytak commented Jul 10, 2025

This PR implements a robust controller for adaptively adjusting the concurrency of upload and download transfers.

This controller uses two statistical models that adapt over time using exponentially weighted moving averages. The first is a model that predicts the overall current bandwith, and the second is a model of the deviance between the actual transfer time and the predicted time based on a linear scaling of the concurrency.

The key idea is this:

  1. When a network connection is underutilized, the latency scales sublinearly with the number of parallel connections. In other words, adding another transfer does not affect the speed of the other transfers significantly.
  2. When a network connection is fully utilized, then the latency scales linearly with the concurrency. In other words, adding increasing the concurrency from N to N+1 would cause the latency of all the other transfers to increase by a factor of (N+1) / N.
  3. When a network connection is oversaturated, then the latency scales superlinearly. In other words, adding an additional connection causes the overall throughput to decrease.

Now, because latency is a noisy observation, we track a running clipped average of the deviance between predicted time and the actual time, and increase the concurrency when this is reliably sublinear and decrease it when it is superlinear. This model uses clipped observations to avoid having a single observation be too heavily weighted; failures and retries max out the deviance.

@hoytak hoytak force-pushed the hoytak/250701-rate-adjustor branch from daedbb3 to b7423e4 Compare July 10, 2025 23:04
@hoytak hoytak requested a review from bpronan July 11, 2025 00:52
@hoytak hoytak changed the title Automatic concurrency adjustment for parallel uploads and downloads Automatic concurrency adjustment for transfers Jul 11, 2025
Copy link
Collaborator

@seanses seanses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the controller logic, but experiments are needed before this becomes a convincing solution. Places I have doubt on:

  • A success signal is defined as a 200 status code & transfer finishes in a linear interpolated hardcoded time limit. This yields different speed expectations for xorbs with different size, e.g. 6.9 Mbps for a 10 MB xorb and 25.6 Mbps for a 64 MB xorb. In practice they should have the same speed.

  • After 90% success signals within a observation window, controller attempts to increment concurrency by 1. It's unclear what's target for this controller to maximize. 25.6 Mbps bandwidth can be easily reached, this control logic means the concurrency will shoot to the max value(100). But the overall throughput with 100 concurrent flows is not guaranteed to be greater than, say 20 flows. And it's unknown that with 100 concurrent flows if a single flow speed will drop below 25.6 Mbps to trigger a decrease signal. This implies that users need to manually tune the speed expectation argument based on their network condition, and thus provides no better experience than tuning the concurrency directly.


/// The minimum number of simultaneous xorb and/or shard upload streams that the
/// the adaptive concurrency control may reduce the concurrancy down to on slower connections.
ref MIN_CONCURRENT_UPLOADS: usize = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems 1 is more reasonable for the worst case.

let n_bytes_transmitted = session
.client
.upload_xorb(&cas_prefix, cas_object, Some(completion_tracker))
.upload_xorb_with_permit(&cas_prefix, cas_object, Some(completion_tracker), upload_permit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An upload_xorb function is exposed through the client interface that acquires the permit automatically. It's better to use this function instead of managing permits from outside.

Is it possible to make the upload_xorb_with_permit function private?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's done this way (though more so true with upload_shard), as we first acquire a permit, then spawn the background task to upload the xorb/shard. If we fold them both in here, we build up background tasks waiting on permit acquisition and run out of memory, as it's the permit acquisition that keeps the number of in-memory xorbs under control. The non-permit versions are just here to make the testing code easy.

shard_client
.upload_shard(&shard_prefix, &si.shard_hash, false, data, &salt)
client
.upload_shard_with_permit(&shard_prefix, &si.shard_hash, false, data, &salt, upload_permit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, use the upload_shard version that acquires the permit automatically.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above.

/// The minimum time in milliseconds between adjustments when decreasing the concurrency.
ref CONCURRENCY_CONTROL_MIN_DECREASE_WINDOW_MS : u64 = 250;

/// The maximum number of connection successes and failures to examine when adjusting the concurrancy.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value and the below two CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_LOWER and CONCURRENCY_CONTROL_TARGET_SUCCESS_RATIO_UPPER share the same comments but they mean different things.

if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
permit.forget();
} else {
self.enqueued_permit_decreases.fetch_add(1, Ordering::Relaxed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this effectively reduces the concurrency: when permits are acquired by tasks as soon as they become available, the attempts to decrement always get enqueued, and ignored (until there's a signal to increment). I think we need to check this queue in acquire() before the actual acquisition happens?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop function in the permit checks this value before returning itself to the main semaphore; if it can decrease this value, it does so and then forgets itself, as this is the only way to decrease the number of permits in a semaphore.

Checking it in acquire has two issues -- 1., it breaks fairness which we depend on for the uploads and downloads to run in order and not block other things, and 2. we need an active permit to actually decrease the concurrency.

The tests below cover this pretty heavily, including a pretty robust stress test, so I'm quite confident in this design.

@hoytak
Copy link
Collaborator Author

hoytak commented Jul 15, 2025

I understand the controller logic, but experiments are needed before this becomes a convincing solution. Places I have doubt on:

  • A success signal is defined as a 200 status code & transfer finishes in a linear interpolated hardcoded time limit. This yields different speed expectations for xorbs with different size, e.g. 6.9 Mbps for a 10 MB xorb and 25.6 Mbps for a 64 MB xorb. In practice they should have the same speed.

I definitely agree it should have more testing, and I'll try to set some things up to do that. Note with the speed calculations, there is a constant time overhead due to server processing as well which I was trying to account for in those numbers. Granted, though, they are empirically calculated.

  • After 90% success signals within a observation window, controller attempts to increment concurrency by 1. It's unclear what's target for this controller to maximize. 25.6 Mbps bandwidth can be easily reached, this control logic means the concurrency will shoot to the max value(100). But the overall throughput with 100 concurrent flows is not guaranteed to be greater than, say 20 flows. And it's unknown that with 100 concurrent flows if a single flow speed will drop below 25.6 Mbps to trigger a decrease signal. This implies that users need to manually tune the speed expectation argument based on their network condition, and thus provides no better experience than tuning the concurrency directly.

It's not really accurate to think of this in terms of speed, but rather in terms of whether or not the bandwidth is saturated, and then to back off before it's overloaded. There's a wide range of concurrency values where it remains saturated. In this window, the time per transfer increases due to congestion, which is fine, until it becomes too congested and things start getting dropped or failing.

The idea is to increase it to the point where we're confident it's saturated, but before problems arise. The early detection of problems is indicated given by the latency for a connection increasing past a threshold. That said, the value of this threshold is rather arbitrary and tuned for my connection, so I think I'm going to change that part to be a bit more automatic.

Ok(ok_response) => process_fn(ok_response).await,
Err(e) => Err(e),
let (reply_bytes, processing_result) = match checked_result {
Ok(ok_response) => (ok_response.content_length().unwrap_or(0), process_fn(ok_response).await),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this size IIUC is the size of the body of the response, not the size of the data sent in the request.

If it is important here to know the size of the request payload, then we should enforce it in another way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used here for the download path; on upload this value is specified when the permit is given to this class. So hopefully it accurately reflects the transfer size. This distinction is made below, but I added a comment.

Comment on lines 7 to 10
/// An adjustable semaphore in which the total number of permits can be adjusted at any time
/// between a minimum and a maximum bound. Adjustments do not affect any permits currently
/// issued; if an adjustment cannot be permformed immediately, then it is resolved before any
/// new permits are issued.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment as to why this is needed over the standard tokio::sync::Semaphore which also has the functionality to add and forget permits

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tokio Semaphore itself cannot forget permits; permits may be discarded and never returned, but there isn't any other way to decrease the number of permits. This turned into quite the headache to manage in the controller class, so I created this utility class to implement and test the forgetting functionality reliably and cleanly. This simplified the logic in the adaptive concurrency code a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants