diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index bcbac67a63f1..db33b4c6df11 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -774,6 +774,27 @@ def test_scheduler_reset_prefix_cache(): assert scheduler.waiting[i] == request +def test_reset_connector_cache_no_connector_is_no_op_success(): + """``reset_connector_cache`` must return True when no connector is + configured. + + Without this, ``reset_prefix_cache(reset_connector=True)`` returns + ``False`` on every engine that doesn't have a KV connector configured — + even when the local prefix cache reset succeeded — and any caller that + interprets the return value as "did the reset I asked for succeed?" + sees a spurious failure. + """ + scheduler = create_scheduler(enable_prefix_caching=True) + assert scheduler.connector is None + + # No-connector reset is treated as success. + assert scheduler.reset_connector_cache() is True + + # End-to-end: reset_prefix_cache(reset_connector=True) on an idle + # scheduler succeeds with or without a connector. + assert scheduler.reset_prefix_cache(reset_connector=True) is True + + # Note - these test cases mirror some of those in test_rejection_sampler.py @pytest.mark.parametrize( "spec_tokens,output_tokens,expected", diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py index 66a94560d030..69593011db9c 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py @@ -11,6 +11,11 @@ from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( connector as mooncake_store_connector, ) +from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( + protocol, + scheduler, + worker, +) from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( MooncakeStoreConnectorMetadata, ) @@ -284,3 +289,328 @@ def test_update_connector_output_and_take_events(): assert connector._kv_cache_events is kv_events assert list(connector.take_events()) == [event] assert connector._kv_cache_events is None + + +# ============================================================ +# reset_cache() — RL hard-reset path via typed LookupKey protocol +# ============================================================ + + +def test_reset_cache_scheduler_role_delegates_to_reset_store(): + """SCHEDULER role reset_cache() routes to scheduler.reset_store().""" + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = mooncake_store_connector.MooncakeStoreConnector( + vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config + ) + + mock_scheduler_cls.return_value.reset_store.return_value = True + assert conn.reset_cache() is True + mock_scheduler_cls.return_value.reset_store.assert_called_once_with() + + +def test_reset_cache_scheduler_role_propagates_failure(): + """SCHEDULER role surfaces False when scheduler.reset_store() fails.""" + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = mooncake_store_connector.MooncakeStoreConnector( + vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config + ) + + mock_scheduler_cls.return_value.reset_store.return_value = False + assert conn.reset_cache() is False + + +def test_reset_cache_worker_role_returns_none(): + """WORKER role reset_cache() is a no-op; reset is driven via ZMQ admin.""" + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreWorker" + ), + ): + conn = mooncake_store_connector.MooncakeStoreConnector( + vllm_config, KVConnectorRole.WORKER, kv_cache_config + ) + + assert conn.reset_cache() is None + + +def test_scheduler_reset_store_returns_client_reset_result(): + """MooncakeStoreScheduler.reset_store() returns LookupKeyClient.reset().""" + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "scheduler.LookupKeyClient" + ) as mock_client_cls, + ): + sched = scheduler.MooncakeStoreScheduler(vllm_config, kv_cache_config) + + mock_client_cls.return_value.reset.return_value = True + assert sched.reset_store() is True + mock_client_cls.return_value.reset.assert_called_once_with() + + +def test_scheduler_reset_store_handles_rpc_exception(): + """Exceptions from the ZMQ reset RPC convert to False, not raise.""" + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "scheduler.LookupKeyClient" + ) as mock_client_cls, + ): + sched = scheduler.MooncakeStoreScheduler(vllm_config, kv_cache_config) + + mock_client_cls.return_value.reset.side_effect = RuntimeError("rpc timed out") + assert sched.reset_store() is False + + +def test_lookup_key_client_lookup_prepends_typed_tag(): + """LookupKeyClient.lookup() puts LOOKUP_MSG tag at frame 0.""" + vllm_config = _make_vllm_config() + + with patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "worker.make_zmq_socket" + ) as mock_make_socket: + client = worker.LookupKeyClient(vllm_config) + + fake_socket = mock_make_socket.return_value + fake_socket.recv.return_value = (5).to_bytes(4, "big") + + assert client.lookup(token_len=128, block_hashes=[]) == 5 + + sent_frames = fake_socket.send_multipart.call_args[0][0] + assert sent_frames[0] == protocol.LOOKUP_MSG + assert int.from_bytes(sent_frames[1], "big") == 128 + + +def test_lookup_key_client_reset_uses_typed_protocol(): + """LookupKeyClient.reset() sends RESET_MSG and parses RESP_OK / RESP_ERR.""" + vllm_config = _make_vllm_config() + + with patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "worker.make_zmq_socket" + ) as mock_make_socket: + client = worker.LookupKeyClient(vllm_config) + + fake_socket = mock_make_socket.return_value + + # ACK path: server returns RESP_OK -> client returns True. + fake_socket.recv.return_value = protocol.RESP_OK + assert client.reset() is True + assert fake_socket.send.call_args[0][0] == protocol.RESET_MSG + + # NACK path: server returns RESP_ERR -> client returns False. + fake_socket.recv.return_value = protocol.RESP_ERR + assert client.reset() is False + + +def test_protocol_tags_are_distinct_and_non_empty(): + """Protocol tags must be unique and non-empty to avoid collision.""" + tags = {protocol.LOOKUP_MSG, protocol.RESET_MSG} + assert len(tags) == 2 + for tag in tags: + assert isinstance(tag, bytes) + assert len(tag) > 0 + assert protocol.RESP_OK != protocol.RESP_ERR + + +def test_scheduler_reset_connector_cache_invokes_connector_reset(): + """Cascade test: Scheduler.reset_prefix_cache(reset_connector=True) + cascades into MooncakeStoreConnector.reset_cache without dragging in + the heavy KVCacheManager fixtures. + """ + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = mooncake_store_connector.MooncakeStoreConnector( + vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config + ) + + mock_scheduler_cls.return_value.reset_store.return_value = True + + class _StubScheduler: + def __init__(self, c): + self.connector = c + + def reset_connector_cache(self): + return self.connector.reset_cache() is not False + + sched = _StubScheduler(conn) + assert sched.reset_connector_cache() is True + mock_scheduler_cls.return_value.reset_store.assert_called_once_with() + + mock_scheduler_cls.return_value.reset_store.reset_mock() + mock_scheduler_cls.return_value.reset_store.return_value = False + assert sched.reset_connector_cache() is False + + +def test_reset_cache_scheduler_role_clears_local_state(): + """SCHEDULER reset_cache() must clear scheduler-side state that points + at master keys we're about to wipe -- pending load_specs and + accumulated _kv_cache_events both reference keys whose blobs are + about to be remove_all'd, so reading them after reset would surface + stale references to wiped keys. + """ + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( # noqa: E501 + LoadSpec, + ) + + vllm_config = _make_vllm_config() + kv_cache_config = _make_kv_cache_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = mooncake_store_connector.MooncakeStoreConnector( + vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config + ) + + # Seed both sentinel pieces of stale-reference state. + sched_inst = mock_scheduler_cls.return_value + sched_inst.load_specs = { + "req-A": LoadSpec(vllm_cached_tokens=0, kvpool_cached_tokens=128, can_load=True) + } + conn._kv_cache_events = mooncake_store_connector.MooncakeStoreKVEvents( + num_workers=1 + ) + sched_inst.reset_store.return_value = True + + assert conn.reset_cache() is True + + # Both stale references must be cleared by the time reset_store is + # invoked downstream (load_specs flushed dict, events nulled). + assert sched_inst.load_specs == {} + assert conn._kv_cache_events is None + + +def test_lookup_key_server_reset_drains_send_queue_before_remove_all(): + """LookupKeyServer RESET handler must drain the send thread's + request_queue BEFORE calling store.remove_all -- otherwise stale + puts that were already in flight when the caller paused generation + can land on the master AFTER remove_all and silently repopulate it + with KV hashed against the previous-policy weights. + """ + # Exercise the handler logic directly with mocks for the send thread + # and store. We assert (a) join() is called, (b) remove_all is called, + # and (c) join() comes BEFORE remove_all in the call order. The full + # LookupKeyServer is heavy (binds a real ZMQ REP socket), so we drive + # just the dispatch branch here via a stub equivalent. + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( + protocol, + ) + + call_order: list[str] = [] + + fake_send_queue = MagicMock() + fake_send_queue.join.side_effect = lambda: call_order.append("join") + + fake_store = MagicMock() + fake_store.remove_all.side_effect = lambda force: call_order.append( + f"remove_all(force={force})" + ) + + fake_send_thread = MagicMock() + fake_send_thread.request_queue = fake_send_queue + + fake_store_worker = MagicMock() + fake_store_worker.kv_send_thread = fake_send_thread + fake_store_worker.store = fake_store + + fake_socket = MagicMock() + sent: list[bytes] = [] + fake_socket.send.side_effect = lambda frame: sent.append(frame) + + # Mirror the body of LookupKeyServer.process_request RESET_MSG branch. + # Keeping this inline (instead of importing the closure) keeps the + # test independent of the live thread lifecycle. + msg_type = protocol.RESET_MSG + if msg_type == protocol.RESET_MSG: + try: + if fake_store_worker.kv_send_thread is not None: + fake_store_worker.kv_send_thread.request_queue.join() + fake_store_worker.store.remove_all(force=True) + fake_socket.send(protocol.RESP_OK) + except Exception: + fake_socket.send(protocol.RESP_ERR) + + # Drain must happen before remove_all. + assert call_order == ["join", "remove_all(force=True)"] + # Worker reported success. + assert sent == [protocol.RESP_OK] + + +def test_lookup_key_server_reset_skips_drain_when_no_send_thread(): + """When the worker has no send thread (e.g. consumer-only role + configurations), the RESET handler must still call remove_all + instead of dereferencing a None send thread. + """ + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( + protocol, + ) + + call_order: list[str] = [] + fake_store = MagicMock() + fake_store.remove_all.side_effect = lambda force: call_order.append("remove_all") + + fake_store_worker = MagicMock() + fake_store_worker.kv_send_thread = None + fake_store_worker.store = fake_store + + fake_socket = MagicMock() + sent: list[bytes] = [] + fake_socket.send.side_effect = lambda frame: sent.append(frame) + + msg_type = protocol.RESET_MSG + if msg_type == protocol.RESET_MSG: + try: + if fake_store_worker.kv_send_thread is not None: + fake_store_worker.kv_send_thread.request_queue.join() + fake_store_worker.store.remove_all(force=True) + fake_socket.send(protocol.RESP_OK) + except Exception: + fake_socket.send(protocol.RESP_ERR) + + assert call_order == ["remove_all"] + assert sent == [protocol.RESP_OK] diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py index 774d1b3a8fad..14d4b381a3c4 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py @@ -200,6 +200,23 @@ def request_finished_all_groups( assert self.connector_scheduler is not None return self.connector_scheduler.request_finished(request, block_ids) + def reset_cache(self) -> bool | None: + """Reset the external Mooncake store on prefix-cache reset. + + Drains the worker send queue, then runs ``remove_all`` on the + Mooncake master. Caller must first pause generation (e.g. + ``pause_generation``) so no new puts are enqueued during drain. + + Returns True on ack, False on failure, None for the worker role. + """ + if self.role == KVConnectorRole.SCHEDULER: + assert self.connector_scheduler is not None + # Clear local references to keys we're about to wipe. + self.connector_scheduler.load_specs.clear() + self._kv_cache_events = None + return self.connector_scheduler.reset_store() + return None + def update_connector_output(self, connector_output: KVConnectorOutput): kv_cache_events = connector_output.kv_cache_events if not kv_cache_events or not isinstance( diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py new file mode 100644 index 000000000000..1317d7816734 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Wire-format constants for the LookupKey ZMQ admin channel. + +This is the single source of truth shared by ``LookupKeyClient`` and +``LookupKeyServer`` on the scheduler<->worker rank-0 admin channel. + +Wire format (REQ/REP over IPC): + + Request: [msg_type: bytes] [payload_frames...] + + msg_type == LOOKUP_MSG: + frame 1: token_len (u32 big-endian, 4 bytes) + frame 2..n: msgpack-encoded list[str] of block-hash hex digests + Response: [hit_count: u32 big-endian, 4 bytes] + + msg_type == RESET_MSG: + (no payload frames) + Response: [RESP_OK] or [RESP_ERR] + +The first frame of every request is a named bytes tag (not a numeric +sentinel that aliases the data field) so the protocol stays +self-describing and extensible: adding new admin commands requires +only a new tag and a new dispatch branch. + +Mirrors the named-tag convention used by the NIXL connector (see +``vllm/distributed/kv_transfer/kv_connector/v1/nixl/metadata.py``). +""" + +# Request message-type tags. Frame 0 of every request. +LOOKUP_MSG: bytes = b"lookup" +RESET_MSG: bytes = b"reset" + +# Single-byte response status codes for admin commands. +RESP_OK: bytes = b"\x01" +RESP_ERR: bytes = b"\x00" diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py index 46e6c9f12e26..52bab591a9be 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py @@ -371,3 +371,30 @@ def request_finished( request.request_id, ) return delay_free_blocks, None + + def reset_store(self) -> bool: + """Trigger a global ``remove_all(force=True)`` on the Mooncake master. + + Routes through the existing LookupKey ZMQ admin channel to worker + rank 0, which owns the ``MooncakeDistributedStore`` handle. + + Ordering assumption: caller (typically + ``Scheduler.reset_connector_cache``, invoked via + ``reset_prefix_cache(reset_connector=True)``) MUST ensure no + in-flight Mooncake lookups or transfers. For RL workflows this is + satisfied at the step boundary after weight updates and rollout + drain. Violating this can allow stale KV to be served on the next + request, defeating the hard-reset guarantee. + + Returns True on ACK from worker, False on NACK or RPC error. + """ + try: + ok = self.client.reset() + if ok: + logger.info("Mooncake store reset via remove_all succeeded.") + else: + logger.warning("Mooncake store reset returned NACK from worker.") + return ok + except Exception as e: + logger.error("Mooncake reset_store RPC failed: %s", e) + return False diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py index bdb6405d445e..486c2553b6d1 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py @@ -50,6 +50,12 @@ PoolKey, ReqMeta, ) +from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.protocol import ( # noqa: E501 + LOOKUP_MSG, + RESET_MSG, + RESP_ERR, + RESP_OK, +) from vllm.logger import init_logger from vllm.utils.network_utils import get_ip, make_zmq_socket from vllm.v1.core.kv_cache_utils import ( @@ -1424,7 +1430,14 @@ def get_kv_events(self) -> list[BlockStored]: class LookupKeyServer: - """ZMQ server on worker rank 0 for handling prefix lookup queries.""" + """ZMQ server on worker rank 0 for the LookupKey admin channel. + + Handles two request types, tagged at frame 0: + - ``LOOKUP_MSG``: prefix-cache hit query, returns hit count. + - ``RESET_MSG``: drains the send thread queue, then runs + ``store.remove_all(force=True)``. Caller must have paused the + scheduler first. + """ def __init__( self, @@ -1450,13 +1463,37 @@ def __init__( def process_request(): while self.running: all_frames = self.socket.recv_multipart(copy=False) - token_len = int.from_bytes(all_frames[0], byteorder="big") - hash_frames = all_frames[1:] - hashes_str = self.decoder.decode(hash_frames) - block_hashes = [BlockHash(bytes.fromhex(s)) for s in hashes_str] - result = self.store_worker.lookup(token_len, block_hashes) - response = result.to_bytes(4, "big") - self.socket.send(response) + msg_type = bytes(all_frames[0]) + + if msg_type == LOOKUP_MSG: + token_len = int.from_bytes(all_frames[1], byteorder="big") + hash_frames = all_frames[2:] + hashes_str = self.decoder.decode(hash_frames) + block_hashes = [BlockHash(bytes.fromhex(s)) for s in hashes_str] + result = self.store_worker.lookup(token_len, block_hashes) + self.socket.send(result.to_bytes(4, "big")) + + elif msg_type == RESET_MSG: + try: + # Drain in-flight puts before wiping the master; + # otherwise stale puts can repopulate it post-reset. + # Safe across HMA: store.remove_all wipes the underlying + # flat key space, clearing every (group_id, hash) entry. + if self.store_worker.kv_send_thread is not None: + self.store_worker.kv_send_thread.request_queue.join() + self.store_worker.store.remove_all(force=True) + logger.info("Mooncake store reset via remove_all succeeded.") + self.socket.send(RESP_OK) + except Exception as e: + logger.error("Mooncake remove_all failed: %s", e) + self.socket.send(RESP_ERR) + + else: + logger.warning( + "LookupKeyServer received unknown msg_type: %r", + msg_type, + ) + self.socket.send(RESP_ERR) self.thread = threading.Thread(target=process_request, daemon=True) self.thread.start() @@ -1473,7 +1510,12 @@ def close(self): class LookupKeyClient: - """ZMQ client for querying prefix cache hits from worker.""" + """ZMQ client for the LookupKey admin channel. + + Routes both prefix-cache lookups and admin commands (currently: + ``reset``) to ``LookupKeyServer`` on worker rank 0. The first frame + of every request is a named tag from ``protocol.py``. + """ def __init__(self, vllm_config: VllmConfig): self.encoder = MsgpackEncoder() @@ -1490,12 +1532,24 @@ def lookup(self, token_len: int, block_hashes: list[BlockHash]) -> int: hash_strs = [h.hex() for h in block_hashes] hash_frames = self.encoder.encode(hash_strs) token_len_bytes = token_len.to_bytes(4, byteorder="big") - all_frames = [token_len_bytes] + list(hash_frames) + all_frames = [LOOKUP_MSG, token_len_bytes] + list(hash_frames) self.socket.send_multipart(all_frames, copy=False) resp = self.socket.recv() result = int.from_bytes(resp, "big") return result + def reset(self) -> bool: + """Trigger ``store.remove_all(force=True)`` on worker rank 0. + + Ordering assumption: caller MUST ensure no in-flight Mooncake + lookups or transfers when invoking reset. In RL workflows this + holds naturally at the step boundary after weight updates and + rollout drain. Returns True on ACK, False on NACK. + """ + self.socket.send(RESET_MSG) + resp = self.socket.recv() + return bytes(resp) == RESP_OK + def close(self): self.socket.close(linger=0) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index c69c9a8119ab..5911859c9d7a 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1945,8 +1945,16 @@ def reset_prefix_cache( def reset_connector_cache(self) -> bool: if self.connector is None: - logger.warning("reset_connector called but no KV connector is configured.") - return False + # No connector attached -> nothing to reset, treat as success so + # callers that unconditionally request a connector reset (e.g. as + # part of a cache-clearing cascade after a weight update) don't + # see reset_prefix_cache() flip to False purely because they + # didn't configure a connector. + logger.debug( + "reset_connector requested but no KV connector is configured; " + "treating as no-op success." + ) + return True if self.connector.reset_cache() is False: return False diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index c9503e46dd88..330bf83988e0 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -655,8 +655,18 @@ def reset_encoder_cache(self) -> None: # Reset the GPU model runner's encoder cache (physical storage) self.model_executor.reset_encoder_cache() - def _reset_caches(self, reset_running_requests=True) -> None: - self.reset_prefix_cache(reset_running_requests=reset_running_requests) + def _reset_caches( + self, + reset_running_requests: bool = True, + reset_connector: bool = True, + ) -> None: + # reset_connector=True so external connectors clear alongside + # local caches, matching the pause_generation(clear_cache=True) + # contract. No-op when no connector is configured. + self.reset_prefix_cache( + reset_running_requests=reset_running_requests, + reset_connector=reset_connector, + ) self.reset_mm_cache() self.reset_encoder_cache()