Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/lerobot/utils/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import platform
from contextlib import suppress
from queue import Empty
from typing import Any

Expand All @@ -30,10 +32,21 @@ def get_last_item_from_queue(queue: Queue, block=True, timeout: float = 0.1) ->
item = None

# Drain queue and keep only the most recent parameters
try:
while True:
if platform.system() == "Darwin":
# On Mac, avoid using `qsize` due to unreliable implementation.
# There is a comment on `qsize` code in the Python source:
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
try:
while True:
item = queue.get_nowait()
except Empty:
pass

return item

# Details about using qsize in https://github.com/huggingface/lerobot/issues/1523
while queue.qsize() > 0:
with suppress(Empty):
item = queue.get_nowait()
except Empty:
pass

return item
16 changes: 16 additions & 0 deletions tests/utils/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import time
from queue import Queue

from torch.multiprocessing import Queue as TorchMPQueue

from lerobot.utils.queue import get_last_item_from_queue


Expand Down Expand Up @@ -46,6 +48,20 @@ def test_get_last_item_multiple_items():
assert queue.empty()


def test_get_last_item_multiple_items_with_torch_queue():
"""Test getting the last item when queue has multiple items."""
queue = TorchMPQueue()
items = ["first", "second", "third", "fourth", "last"]

for item in items:
queue.put(item)

result = get_last_item_from_queue(queue)

assert result == "last"
assert queue.empty()


def test_get_last_item_different_types():
"""Test with different data types in the queue."""
queue = Queue()
Expand Down
Loading