-
-
Notifications
You must be signed in to change notification settings - Fork 343
/
aio-guest-test.py
53 lines (35 loc) · 1.12 KB
/
aio-guest-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import trio
async def aio_main():
loop = asyncio.get_running_loop()
trio_done_fut = loop.create_future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
async def trio_main():
print("trio_main!")
to_trio, from_aio = trio.open_memory_channel(float("inf"))
from_trio = asyncio.Queue()
task_ref = asyncio.create_task(aio_pingpong(from_trio, to_trio))
from_trio.put_nowait(0)
async for n in from_aio:
print(f"trio got: {n}")
await trio.sleep(1)
from_trio.put_nowait(n + 1)
if n >= 10:
return
del task_ref
async def aio_pingpong(from_trio, to_trio):
print("aio_pingpong!")
while True:
n = await from_trio.get()
print(f"aio got: {n}")
await asyncio.sleep(1)
to_trio.send_nowait(n + 1)
asyncio.run(aio_main())