Skip to content

Commit e171cae

Browse files
committed
fix: expiration cache hanging causing delay in driver
1 parent 74ae5de commit e171cae

File tree

2 files changed

+45
-19
lines changed

2 files changed

+45
-19
lines changed

aws_advanced_python_wrapper/utils/sliding_expiration_cache.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from concurrent.futures import Executor, ThreadPoolExecutor
17+
from threading import Thread
1818
from time import perf_counter_ns, sleep
1919
from typing import Callable, Generic, ItemsView, KeysView, Optional, TypeVar
2020

@@ -119,28 +119,26 @@ def __init__(
119119
should_dispose_func: Optional[Callable] = None,
120120
item_disposal_func: Optional[Callable] = None):
121121
super().__init__(cleanup_interval_ns, should_dispose_func, item_disposal_func)
122-
self._executor: Executor = ThreadPoolExecutor(thread_name_prefix="SlidingExpirationCacheWithCleanupThreadExecutor")
123-
self.init_cleanup_thread()
124-
125-
def init_cleanup_thread(self) -> None:
126-
self._executor.submit(self._cleanup_thread_internal)
122+
self._cleanup_thread = Thread(target=self._cleanup_thread_internal, daemon=True)
123+
self._cleanup_thread.start()
127124

128125
def _cleanup_thread_internal(self):
129-
logger.debug("SlidingExpirationCache.CleaningUp")
130-
current_time = perf_counter_ns()
131-
sleep(self._cleanup_interval_ns / 1_000_000_000)
132-
self._cleanup_time_ns.set(current_time + self._cleanup_interval_ns)
133-
keys = [key for key, _ in self._cdict.items()]
134-
for key in keys:
126+
while True:
135127
try:
136-
self._remove_if_expired(key)
128+
sleep(self._cleanup_interval_ns / 1_000_000_000)
129+
logger.debug("SlidingExpirationCache.CleaningUp")
130+
self._cleanup_time_ns.set(perf_counter_ns() + self._cleanup_interval_ns)
131+
keys = [key for key, _ in self._cdict.items()]
132+
for key in keys:
133+
try:
134+
self._remove_if_expired(key)
135+
except Exception:
136+
pass # ignore
137137
except Exception:
138-
pass # ignore
139-
140-
self._executor.shutdown()
138+
break
141139

142140
def _cleanup(self):
143-
pass # do nothing, cleanup thread does the job
141+
pass # cleanup thread handles this
144142

145143

146144
class CacheItem(Generic[V]):

tests/unit/test_sliding_expiration_cache.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
import time
1616

17-
from aws_advanced_python_wrapper.utils.sliding_expiration_cache import \
18-
SlidingExpirationCache
17+
from aws_advanced_python_wrapper.utils.sliding_expiration_cache import (
18+
SlidingExpirationCache, SlidingExpirationCacheWithCleanupThread)
1919

2020

2121
def test_compute_if_absent():
@@ -89,6 +89,34 @@ def test_clear():
8989
assert item2.disposed is True
9090

9191

92+
def test_cleanup_thread_continuous_removal():
93+
# Use very short cleanup interval for testing (100ms)
94+
cache = SlidingExpirationCacheWithCleanupThread(
95+
cleanup_interval_ns=100_000_000, # 100ms
96+
item_disposal_func=lambda item: item.dispose()
97+
)
98+
99+
# First cycle: insert item that expires quickly
100+
item1 = DisposableItem(True)
101+
cache.compute_if_absent("key1", lambda _: item1, 50_000_000) # 50ms expiration
102+
assert cache.get("key1") == item1
103+
104+
# Wait for cleanup thread to remove expired item
105+
time.sleep(0.2) # Wait 200ms for cleanup
106+
assert cache.get("key1") is None
107+
assert item1.disposed is True
108+
109+
# Second cycle: insert another item that expires quickly
110+
item2 = DisposableItem(True)
111+
cache.compute_if_absent("key2", lambda _: item2, 50_000_000) # 50ms expiration
112+
assert cache.get("key2") == item2
113+
114+
# Wait for cleanup thread to remove second expired item
115+
time.sleep(0.2) # Wait 200ms for cleanup
116+
assert cache.get("key2") is None
117+
assert item2.disposed is True
118+
119+
92120
class DisposableItem:
93121
def __init__(self, should_dispose):
94122
self.should_dispose = should_dispose

0 commit comments

Comments
 (0)