Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BOUNTY - $200] Share kv cache between nodes for redundancy #52

Open
AlexCheema opened this issue Jul 21, 2024 · 12 comments
Open

[BOUNTY - $200] Share kv cache between nodes for redundancy #52

AlexCheema opened this issue Jul 21, 2024 · 12 comments
Assignees
Labels
enhancement New feature or request

Comments

@AlexCheema
Copy link
Contributor

#23 (comment)

Perhaps after each inference, we synchronise the full kv cache between all nodes. This should be fairly straightforward, we can broadcast the entire cache.

this would allow for saving context even when a node goes down.

@dhruvmalik007
Copy link

dhruvmalik007 commented Jul 21, 2024

Hi Alex . just was curious as I was currently learning about CRDT strategies and indeed wanted to understand the challenge on broadcasting the cache changes with either:

  1. Via strong consistency (I.e doing the mutex mechanism for sequentially sharing the corresponding KVCache to each entities sequentially ).

  2. Via Eventual consistency : by doing the updates independently and then implementing strategies like last write wins or other parameters.

  3. Strong and eventual consistency : implementing both techniques by creating the tuple between the KVcache updates with the corresponding timestamps.

so is that a good strategy to think about the KVCacahe updates?

@AlexCheema
Copy link
Contributor Author

AlexCheema commented Jul 21, 2024

I think eventual consistency is preferred here. Just broadcast them out optimistically, with a monotonically increasing version number and pick max(current_kv_cache.version, version)

@dhruvmalik007
Copy link

sounds good . I am happy to get assigned to this and create PR if its public issue .

@AlexCheema
Copy link
Contributor Author

sounds good . I am happy to get assigned to this and create PR if its public issue .

Great! I will actually assign a $200 bounty to this as it’s an important upgrade and highly valued contribution if you can get it working reliably!

@AlexCheema AlexCheema changed the title Share kv cache between nodes for redundancy [BOUNTY - $200] Share kv cache between nodes for redundancy Jul 22, 2024
@dhruvmalik007
Copy link

Thanks . I will try to setup PR till Wednesday due to limited time BW tommorrow . Just wanted to ask whether the gRPCPeerHandler sendMessage function will work as the endpoint for broadcasting ?

@stephanj
Copy link

More context info for people who want to understand why, how etc. provided by Claude :)

Synchronizing the full KV cache between all nodes after each inference could indeed provide some unique benefits, particularly in terms of fault tolerance and maintaining context across the cluster. Let's explore this idea and consider its implications:

Benefits:

  1. Fault Tolerance: As you mentioned, this would allow for preserving context even if a node goes down. Any node could potentially take over the work of a failed node without losing the context.

  2. Load Balancing: It could potentially allow for more flexible load balancing, as any node would have the full context and could theoretically handle the next request.

  3. Consistent State: It ensures that all nodes have a consistent view of the conversation state, which could be beneficial for debugging and monitoring.

Implementation Considerations:

  1. Cache Broadcast Mechanism: We'd need to implement a efficient broadcast mechanism. This could be done using the existing gRPC infrastructure:

    async def broadcast_kv_cache(self, kv_cache):
        async def send_cache_to_peer(peer):
            try:
                await asyncio.wait_for(peer.send_kv_cache(kv_cache), timeout=15.0)
            except asyncio.TimeoutError:
                print(f"Timeout broadcasting KV cache to {peer.id()}")
            except Exception as e:
                print(f"Error broadcasting KV cache to {peer.id()}: {e}")
    
        await asyncio.gather(*[send_cache_to_peer(peer) for peer in self.peers], return_exceptions=True)
  2. Cache Reception and Update: Each node would need a method to receive and update its local KV cache:

    async def update_kv_cache(self, new_kv_cache):
        # Logic to update the local KV cache
        pass
  3. Synchronization Point: We'd need to decide when exactly to synchronize. After each token generation might be too frequent and could introduce latency. After each full response might be more appropriate:

    async def process_prompt(self, base_shard: Shard, prompt: str, request_id: Optional[str] = None, inference_state: Optional[str] = None) -> Optional[np.ndarray]:
        resp = await self._process_prompt(base_shard, prompt, request_id, inference_state)
        kv_cache = self.get_current_kv_cache()  # Method to get the current KV cache
        await self.broadcast_kv_cache(kv_cache)
        return resp

Challenges and Considerations:

  1. Network Overhead: Broadcasting the entire KV cache after each inference could introduce significant network overhead, especially for larger models or longer conversations.

  2. Synchronization Delay: The time taken to broadcast and update the KV cache on all nodes could introduce latency in the system.

  3. Cache Size: For very large models or long conversations, the KV cache could become quite large, potentially causing memory issues on nodes with less available RAM.

  4. Consistency Management: We'd need to ensure that the cache remains consistent across all nodes, even in the face of network delays or failures.

  5. API Compatibility: This approach differs from how most LLM APIs work, which typically don't maintain state between requests. We'd need to consider how to make this transparent to API users or whether to expose it as a feature.

Potential Optimizations:

  1. Differential Updates: Instead of broadcasting the entire cache each time, we could send only the changes since the last synchronization.

  2. Compression: Implement compression for the KV cache before transmission to reduce network overhead.

  3. Asynchronous Updates: Perform the cache broadcast asynchronously, allowing the node to continue processing while the update happens in the background.

  4. Configurable Behavior: Allow users to choose whether they want this "stateful" behavior or a more traditional stateless API.

Implementation Steps:

  1. Modify the InferenceEngine interface and implementations to expose methods for getting and setting the KV cache.

  2. Implement the broadcast and update mechanisms in the StandardNode class.

  3. Modify the processing methods (process_prompt, process_tensor) to trigger cache synchronization at appropriate points.

  4. Update the PeerHandle interface and implementations to include methods for sending and receiving KV cache updates.

  5. Implement error handling and recovery mechanisms to deal with synchronization failures.

  6. Add configuration options to enable/disable this feature and control its behavior (e.g., synchronization frequency).

While this approach could offer some unique advantages, it's important to carefully consider the trade-offs, particularly in terms of performance and complexity. It might be worth implementing this as an optional feature that can be enabled for specific use cases where maintaining context across the cluster is particularly valuable.

@dhruvmalik007
Copy link

More context info for people who want to understand why, how etc. provided by Claude :)

Synchronizing the full KV cache between all nodes after each inference could indeed provide some unique benefits, particularly in terms of fault tolerance and maintaining context across the cluster. Let's explore this idea and consider its implications:

Benefits:

  1. Fault Tolerance: As you mentioned, this would allow for preserving context even if a node goes down. Any node could potentially take over the work of a failed node without losing the context.
  2. Load Balancing: It could potentially allow for more flexible load balancing, as any node would have the full context and could theoretically handle the next request.
  3. Consistent State: It ensures that all nodes have a consistent view of the conversation state, which could be beneficial for debugging and monitoring.

Implementation Considerations:

  1. Cache Broadcast Mechanism: We'd need to implement a efficient broadcast mechanism. This could be done using the existing gRPC infrastructure:
    async def broadcast_kv_cache(self, kv_cache):
        async def send_cache_to_peer(peer):
            try:
                await asyncio.wait_for(peer.send_kv_cache(kv_cache), timeout=15.0)
            except asyncio.TimeoutError:
                print(f"Timeout broadcasting KV cache to {peer.id()}")
            except Exception as e:
                print(f"Error broadcasting KV cache to {peer.id()}: {e}")
    
        await asyncio.gather(*[send_cache_to_peer(peer) for peer in self.peers], return_exceptions=True)
  2. Cache Reception and Update: Each node would need a method to receive and update its local KV cache:
    async def update_kv_cache(self, new_kv_cache):
        # Logic to update the local KV cache
        pass
  3. Synchronization Point: We'd need to decide when exactly to synchronize. After each token generation might be too frequent and could introduce latency. After each full response might be more appropriate:
    async def process_prompt(self, base_shard: Shard, prompt: str, request_id: Optional[str] = None, inference_state: Optional[str] = None) -> Optional[np.ndarray]:
        resp = await self._process_prompt(base_shard, prompt, request_id, inference_state)
        kv_cache = self.get_current_kv_cache()  # Method to get the current KV cache
        await self.broadcast_kv_cache(kv_cache)
        return resp

Challenges and Considerations:

  1. Network Overhead: Broadcasting the entire KV cache after each inference could introduce significant network overhead, especially for larger models or longer conversations.
  2. Synchronization Delay: The time taken to broadcast and update the KV cache on all nodes could introduce latency in the system.
  3. Cache Size: For very large models or long conversations, the KV cache could become quite large, potentially causing memory issues on nodes with less available RAM.
  4. Consistency Management: We'd need to ensure that the cache remains consistent across all nodes, even in the face of network delays or failures.
  5. API Compatibility: This approach differs from how most LLM APIs work, which typically don't maintain state between requests. We'd need to consider how to make this transparent to API users or whether to expose it as a feature.

Potential Optimizations:

  1. Differential Updates: Instead of broadcasting the entire cache each time, we could send only the changes since the last synchronization.
  2. Compression: Implement compression for the KV cache before transmission to reduce network overhead.
  3. Asynchronous Updates: Perform the cache broadcast asynchronously, allowing the node to continue processing while the update happens in the background.
  4. Configurable Behavior: Allow users to choose whether they want this "stateful" behavior or a more traditional stateless API.

Implementation Steps:

  1. Modify the InferenceEngine interface and implementations to expose methods for getting and setting the KV cache.
  2. Implement the broadcast and update mechanisms in the StandardNode class.
  3. Modify the processing methods (process_prompt, process_tensor) to trigger cache synchronization at appropriate points.
  4. Update the PeerHandle interface and implementations to include methods for sending and receiving KV cache updates.
  5. Implement error handling and recovery mechanisms to deal with synchronization failures.
  6. Add configuration options to enable/disable this feature and control its behavior (e.g., synchronization frequency).

While this approach could offer some unique advantages, it's important to carefully consider the trade-offs, particularly in terms of performance and complexity. It might be worth implementing this as an optional feature that can be enabled for specific use cases where maintaining context across the cluster is particularly valuable.

Thanks @stephanj for adding context along with sharing the checklists of the subject . currently it's work in progress here .

I am gonna implement modified version of gossip protocol for providing strong consistency .

I do have some general feedback of the viewpoints that you've shared:

  • Currently Fault tolerance is not the priority for this PR however it will be possible with additional function to detect the changes in topology and then doing the synchronisation of the shard-state with KVCache from the quantised model file.

  • For the update_kv_cache : I am thinking to implement the initialisation process (when the reset for the given model and shard is done) In order to broadcast to the subset of the peers , and then during inference phase , either to broadcast the updated cache from each of this peers to the close peers till the delta (between the corresponding KV values) is zero.

  • I think doing the changes in the process_tensor will be too much overhead so I will first do the test with the prompt generation and later will analyze both approach .

@pranav4501
Copy link

Hi,
I noticed this bounty issue has been open for about a month without recent activity. I'm interested in working on this task. Before I start, I wanted to check.

@dhruvmalik007
Copy link

Hi @pranav4501 , yes I had starting working on this 2 weeks but didnt had time in between : https://github.com/dhruvmalik007/exo/tree/feat/adding_KV_broadcast_cache

I will try till end of this week but if you are interested to implement before I can assign you.

@pranav4501
Copy link

Hi @dhruvmalik007 ,
I was just checking if someone was still working on it. I will look at other issues. Thank you.

@FFAMax
Copy link
Contributor

FFAMax commented Oct 6, 2024

Hello, @dhruvmalik007
Is it possible without extra effort stick to ethernet multicast groups where local peers discovered so they can subscribe individually in case of several clusters spitted by groups?

@dhruvmalik007
Copy link

Hi,

you mean that in the grpc_peer_handle.py we can parse the current topology from the stored state and then use the protobuf function to send the cache value ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants