diff --git a/tractor/msg.py b/tractor/msg.py index 28e3405db..2897b8766 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -121,6 +121,7 @@ def pub( wrapped: typing.Callable = None, *, tasks: Set[str] = set(), + send_on_connect: Any = None, ): """Publisher async generator decorator. @@ -206,7 +207,7 @@ async def pub_service(get_topics): # handle the decorator not called with () case if wrapped is None: - return partial(pub, tasks=tasks) + return partial(pub, tasks=tasks, send_on_connect=send_on_connect) task2lock: Dict[str, trio.StrictFIFOLock] = {} @@ -249,6 +250,11 @@ async def _execute( try: modify_subs(topics2ctxs, topics, ctx) + + # if specified send the startup message back to consumer + if send_on_connect is not None: + await ctx.send_yield(send_on_connect) + # block and let existing feed task deliver # stream data until it is cancelled in which case # the next waiting task will take over and spawn it again