-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: RedisAwaitedActionDb #1182
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable, and 9 discussions need to be resolved
nativelink-scheduler/src/redis_awaited_action_db.rs
line 112 at r1 (raw file):
impl RedisAwaitedActionDbImpl { pub async fn get_operation_subscriber_count(&mut self, operation_id: &OperationId) -> usize {
Inline these helpers or move up the Err->Option handling so that they are actually doing something to save LoC.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 171 at r1 (raw file):
}; // TODO: Create a spawn which will wait on this drop_rx to notify the db of the drop
Implement this
nativelink-scheduler/src/redis_awaited_action_db.rs
line 350 at r1 (raw file):
} async fn get_all_awaited_actions(
It is very difficult to get this data in the right format for this signature without just using unwrap or logging events to discard errors.
I wasn't able to figure out how to get the closure to accept returning a result but I'm sure there's a way to do so and all this can be moved above the for loop with proper error handling.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 378 at r1 (raw file):
) -> Result<Option<Self::Subscriber>, Error> { let result = self.get_operation_id_by_client_id(client_id).await; let operation_id = match result {
The Err -> Option handling here should probably be done in the helper functions above.
nativelink-scheduler/src/awaited_action_db/awaited_action.rs
line 61 at r1 (raw file):
} impl AwaitedAction {
These functions should be simple with serde_json since json can go to/from bytes.
nativelink-store/src/redis_store.rs
line 57 at r1 (raw file):
pub enum ConnectionKind { Cluster(ClusterConnection), Single((redis::Client, ConnectionManager)),
This was something @allada and I did partially together, but ultimately I wasn't able to get access to the Client by including it here.
Since I was able to get returning this somewhat compiling, I kept it, but I don't think this solution is going to work. Since it's nested below 2-3 levels of fairly complicated generics the types are really finicky.
nativelink-store/src/redis_store.rs
line 141 at r1 (raw file):
let client = redis::Client::open(params).map_err(from_redis_err)?; let client_clone = client.clone(); let init = async move { Ok((client_clone.clone(), client_clone.get_connection_manager().await.unwrap())) };
@jhpratt and I spent some time on this and could only get it to compile by using this code. Unfortunately returning it here didn't seem to make it accessible in the return value the way @allada and I thought it would, so I just returned the client as part of the tuple.
nativelink-store/src/redis_store.rs
line 268 at r1 (raw file):
todo!() // if config.addresses.is_empty() {
Technically this code compiles, but it causes error messages for all the tests to pass the client through. Until we decide on how to make the client accessible from inside the constructor it's easiest to just mark it todo!
so there aren't a bunch of compiler errors thrown by the test cases.
nativelink-util/src/action_messages.rs
line 768 at r1 (raw file):
} impl Serialize for ActionStage {
Should be fairly trivial, I think I have a snippet for this somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable, and 16 discussions need to be resolved
nativelink-scheduler/src/redis_awaited_action_db.rs
line 67 at r1 (raw file):
enum RedisActionEvent { /// A client has sent a keep alive message. // ClientKeepAlive(ClientOperationId),
note: keep-alives still need to be processed to prevent the operation from being dropped in redis.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 225 at r1 (raw file):
let client = store.get_client(); let weak_inner = Arc::downgrade(&inner); let _joinhandle = background_spawn!("redis_action_change_listener", async move {
nit: This join handle should be on the returned struct. This will allow the connection to be cleaned up if this struct is destroyed.
Also, don't use background_spawn!
use spawn!
nativelink-scheduler/src/redis_awaited_action_db.rs
line 237 at r1 (raw file):
let mut stream = pubsub.into_on_message(); loop { let msg = stream.next().await.unwrap();
nit: Sadly this needs to be more complicated. It needs to probably use the raw poll functions, so we don't upgrade()
on every cycle and instead only upgrade once, then traverse any items in our buffer, then release the upgrade.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 243 at r1 (raw file):
let mut inner_mut = inner_mutex.lock().await; let tx = inner_mut.get_operation_sender(state.operation_id()).await.unwrap(); tx.send_replace(state);
nit: Put comment here on why not to use .send()
.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 276 at r1 (raw file):
let sub = self.subscribe_to_operation(&operation_id).await; let key = format!("cid:{client_id}"); self.store.update_oneshot(StoreKey::from(key), operation_id.to_string().into()).await?;
nit: I don't think you need StoreKey::from
, it should implicitly take a String
and do conversion for you.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 277 at r1 (raw file):
let key = format!("cid:{client_id}"); self.store.update_oneshot(StoreKey::from(key), operation_id.to_string().into()).await?; self.inc_operation_subscriber_count(&operation_id).await;
We should not be doing the increment of subscriber count here. It needs to happen inside the construction of the sub
object. We could easily end up with a memory problems here because of this:
1. Subscribe to operation and acquire an owned `RedisAwaitedActionSubscriber`
2. Future dies while `update_oneshot` is being processed.
3. `RedisAwaitedActionSubscriber::drop()` is called
4. `drop()` function calls `dec_operation_subscriber_count()`, which the `inc_operation_subscriber_count()` was never called, so the state is out-of-sync.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 288 at r1 (raw file):
self.store.update_oneshot(StoreKey::from(key), operation_id.to_string().into()).await?; let key = format!("ahk:{}", &action_info.unique_qualifier);
Action Hash Key needs to happen after Operation ID insertion or we could end up pointing to something that doesn't exist.
Remember any time .await
is called, think: "what might happen if the function returns here?"
nativelink-scheduler/src/redis_awaited_action_db.rs
line 350 at r1 (raw file):
Previously, zbirenbaum (Zach Birenbaum) wrote…
It is very difficult to get this data in the right format for this signature without just using unwrap or logging events to discard errors.
I wasn't able to figure out how to get the closure to accept returning a result but I'm sure there's a way to do so and all this can be moved above the for loop with proper error handling.
The way you generally want to handle this is by using an ErrorRedisSubscriber
or something that always returns a stream of only the error with .left_stream()
and .right_stream()
for the real stream.
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: TraceMachina#1182
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 8 files at r1, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable, and 16 discussions need to be resolved
nativelink-store/src/redis_store.rs
line 141 at r1 (raw file):
Previously, zbirenbaum (Zach Birenbaum) wrote…
@jhpratt and I spent some time on this and could only get it to compile by using this code. Unfortunately returning it here didn't seem to make it accessible in the return value the way @allada and I thought it would, so I just returned the client as part of the tuple.
Would you like me to take a look at this in more detail? What we discussed on the call was largely to unblock your work by simply making it compile, regardless of whether it was the best in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Local / ubuntu-22.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable, and 16 discussions need to be resolved
nativelink-store/src/redis_store.rs
line 141 at r1 (raw file):
Previously, jhpratt (Jacob Pratt) wrote…
Would you like me to take a look at this in more detail? What we discussed on the call was largely to unblock your work by simply making it compile, regardless of whether it was the best in the end.
Let's sync on it when you have a chance. I think the ideal case would be embedding it in a field like the connection where it already exists, so we can pull it out when needed but still have it hidden for the tests.
b17d637
to
421199c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: docker-compose-compiles-nativelink (20.04), pre-commit-checks, vale, and 13 discussions need to be resolved
nativelink-scheduler/src/redis_awaited_action_db.rs
line 225 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: This join handle should be on the returned struct. This will allow the connection to be cleaned up if this struct is destroyed.
Also, don't use
background_spawn!
usespawn!
Done.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 237 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Sadly this needs to be more complicated. It needs to probably use the raw poll functions, so we don't
upgrade()
on every cycle and instead only upgrade once, then traverse any items in our buffer, then release the upgrade.
Should we be creating a new struct that implements future to handle this? Or do you mean calling poll_next()
?
nativelink-scheduler/src/redis_awaited_action_db.rs
line 243 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Put comment here on why not to use
.send()
.
Done.
nativelink-scheduler/src/redis_awaited_action_db.rs
line 276 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: I don't think you need
StoreKey::from
, it should implicitly take aString
and do conversion for you.
Done. It doesn't take a String
but it does take &str
so I changed all those instances to use .as_str()
nativelink-scheduler/src/redis_awaited_action_db.rs
line 288 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
Action Hash Key needs to happen after Operation ID insertion or we could end up pointing to something that doesn't exist.
Remember any time
.await
is called, think: "what might happen if the function returns here?"
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: docker-compose-compiles-nativelink (20.04), pre-commit-checks, vale, and 13 discussions need to be resolved
nativelink-util/src/action_messages.rs
line 768 at r1 (raw file):
Previously, zbirenbaum (Zach Birenbaum) wrote…
Should be fairly trivial, I think I have a snippet for this somewhere.
Commit for early feedback and design planning.
421199c
to
e2227df
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Bazel Dev / ubuntu-22.04, Cargo Dev / macos-13, Cargo Dev / ubuntu-22.04, Installation / macos-13, Installation / macos-14, Installation / ubuntu-22.04, Publish image, Publish nativelink-worker-init, Publish nativelink-worker-lre-cc, Remote / large-ubuntu-22.04, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), docker-compose-compiles-nativelink (22.04), integration-tests (20.04), integration-tests (22.04), macos-13, ubuntu-20.04 / stable, ubuntu-22.04, ubuntu-22.04 / stable, windows-2022 / stable, and 13 discussions need to be resolved
nativelink-scheduler/src/redis_awaited_action_db.rs
line 277 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
We should not be doing the increment of subscriber count here. It needs to happen inside the construction of the
sub
object. We could easily end up with a memory problems here because of this:1. Subscribe to operation and acquire an owned `RedisAwaitedActionSubscriber` 2. Future dies while `update_oneshot` is being processed. 3. `RedisAwaitedActionSubscriber::drop()` is called 4. `drop()` function calls `dec_operation_subscriber_count()`, which the `inc_operation_subscriber_count()` was never called, so the state is out-of-sync.
Done.
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: TraceMachina#1182
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: TraceMachina#1182
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: TraceMachina#1182
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: #1182
Implements Serialize/Deserialize for ActionStage to allow for storage and retrieval from stores such as Redis. towards: TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards TraceMachina#1182
Removes unnecessary sync bounds from functions inside of the AwaitedActionDb trait. These bounds interfere with implementations while attempting to communicate to with the redis store. towards #1182
Commit for early feedback and design planning.
Description
Notable partial or full TODO items include:
Type of change
Please delete options that aren't relevant.
How Has This Been Tested?
Please also list any relevant details for your test configuration
Checklist
bazel test //...
passes locallygit amend
see some docsThis change is