Skip to content

Commit

Permalink
Optimize loading time for chunks to be there (#19109)
Browse files Browse the repository at this point in the history
Co-authored-by: thomas <[email protected]>
  • Loading branch information
tchaton and thomas authored Dec 4, 2023
1 parent e7d7399 commit 35ea94f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/lightning/app/utilities/logs_socket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def create_lightning_logs_socket(
project_id: str,
app_id: str,
component: str,
on_message_callback: Callable[[WebSocketApp, str], None],
on_error_callback: Optional[Callable[[Exception, str], None]] = None,
on_message_callback: Callable,
on_error_callback: Optional[Callable] = None,
) -> WebSocketApp:
"""Creates and returns WebSocketApp to listen to lightning app logs.
Expand Down
18 changes: 14 additions & 4 deletions src/lightning/data/streaming/item_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,16 @@ def load_item_from_chunk(self, index: int, chunk_index: int, chunk_filepath: str
del self._chunk_filepaths[chunk_filepath]

if chunk_filepath not in self._chunk_filepaths:
while not os.path.exists(chunk_filepath):
first_exists = exists = os.path.exists(chunk_filepath)

while not exists:
sleep(0.01)
exists = os.path.exists(chunk_filepath)

# Wait to avoid any corruption when the file appears
sleep(0.01)
if not first_exists:
sleep(0.001)

self._chunk_filepaths[chunk_filepath] = True

with open(chunk_filepath, "rb", 0) as fp:
Expand Down Expand Up @@ -146,11 +151,16 @@ def load_item_from_chunk(self, index: int, chunk_index: int, chunk_filepath: str
del self._chunk_filepaths[chunk_filepath]

if chunk_filepath not in self._chunk_filepaths:
while not os.path.exists(chunk_filepath):
first_exists = exists = os.path.exists(chunk_filepath)

while not exists:
sleep(0.01)
exists = os.path.exists(chunk_filepath)

# Wait to avoid any corruption when the file appears
sleep(0.01)
if not first_exists:
sleep(0.001)

self._chunk_filepaths[chunk_filepath] = True

if chunk_index not in self._mmaps:
Expand Down

0 comments on commit 35ea94f

Please sign in to comment.