Skip to content

Commit

Permalink
Error on mal-use of Context.started()
Browse files Browse the repository at this point in the history
Previously we were ignoring a race where the callee an opened task
context could enter `Context.open_stream()` before calling `.started().
Disallow this as well as calling `.started()` more then once.
  • Loading branch information
goodboy committed Nov 9, 2021
1 parent 35ac323 commit 5e2a94f
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ class Context:
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
_result: Optional[Any] = False
_cancel_called: bool = False
_started_called: bool = False
_started_received: bool = False

# only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None
Expand Down Expand Up @@ -455,6 +457,11 @@ async def open_stream(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)

if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)

# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the
Expand Down Expand Up @@ -536,13 +543,29 @@ async def result(self) -> Any:

return self._result

async def started(self, value: Optional[Any] = None) -> None:
async def started(
self,
value: Optional[Any] = None

) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")

elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")

await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True

# TODO: do we need a restart api?
# async def restart(self) -> None:
Expand Down

0 comments on commit 5e2a94f

Please sign in to comment.