Skip to content

Commit

Permalink
aclosing wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
kramstrom committed Oct 14, 2024
1 parent ae1d8f7 commit c382cac
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions modal/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from ._serialization import serialize, serialize_proto_params
from ._utils.async_utils import (
TaskContext,
aclosing,
async_merge,
awaitable_to_aiter,
synchronize_api,
Expand Down Expand Up @@ -210,16 +211,17 @@ async def run_generator(self):

items_received = 0
items_total: Union[int, None] = None # populated when self.run_function() completes
async for item in async_merge(data_stream, awaitable_to_aiter(self.run_function())):
if isinstance(item, api_pb2.GeneratorDone):
items_total = item.items_total
else:
yield item
items_received += 1
# The comparison avoids infinite loops if a non-deterministic generator is retried
# and produces less data in the second run than what was already sent.
if items_total is not None and items_received >= items_total:
break
async with aclosing(async_merge(data_stream, awaitable_to_aiter(self.run_function()))) as stream:
async for item in stream:
if isinstance(item, api_pb2.GeneratorDone):
items_total = item.items_total
else:
yield item
items_received += 1
# The comparison avoids infinite loops if a non-deterministic generator is retried
# and produces less data in the second run than what was already sent.
if items_total is not None and items_received >= items_total:
break


# Wrapper type for api_pb2.FunctionStats
Expand Down

0 comments on commit c382cac

Please sign in to comment.