Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared memory array API and optional tight integration with numpy #338

Open
wants to merge 12 commits into
base: ctx_cancel_semantics_and_overruns
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mypy
trio_typing
pexpect
towncrier
numpy
167 changes: 167 additions & 0 deletions tests/test_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
Shared mem primitives and APIs.

"""
import uuid

# import numpy
import pytest
import trio
import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
)


@tractor.context
async def child_attach_shml_alot(
ctx: tractor.Context,
shm_key: str,
) -> None:

await ctx.started(shm_key)

# now try to attach a boatload of times in a loop..
for _ in range(1000):
shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key
await trio.sleep(0.001)


def test_child_attaches_alot():
async def main():
async with tractor.open_nursery() as an:

# allocate writeable list in parent
key = f'shml_{uuid.uuid4()}'
shml = open_shm_list(
key=key,
)

portal = await an.start_actor(
'shm_attacher',
enable_modules=[__name__],
)

async with (
portal.open_context(
child_attach_shml_alot,
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
await ctx.result()

await portal.cancel_actor()

trio.run(main)


@tractor.context
async def child_read_shm_list(
ctx: tractor.Context,
shm_key: str,
use_str: bool,
frame_size: int,
) -> None:

# attach in child
shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key)

async with ctx.open_stream() as stream:
async for i in stream:
print(f'(child): reading shm list index: {i}')

if use_str:
expect = str(float(i))
else:
expect = float(i)

if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')


@pytest.mark.parametrize(
'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
)
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader(
use_str: bool,
frame_size: int,
):

async def main():
async with tractor.open_nursery(
# debug_mode=True,
) as an:

portal = await an.start_actor(
'shm_reader',
enable_modules=[__name__],
debug_mode=True,
)

# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
dtype=str if use_str else float,
readonly=False,
)

async with (
portal.open_context(
child_read_shm_list,
shm_key=key,
use_str=use_str,
frame_size=frame_size,
) as (ctx, sent),

ctx.open_stream() as stream,
):

assert sent == key

for i in range(seq_size):

val = float(i)
if use_str:
val = str(val)

# print(f'(parent): writing {val}')
shml[i] = val

# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i)

await portal.cancel_actor()

trio.run(main)
Loading