Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trio: small fixes #88

Merged
merged 4 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions pysipp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
import tempfile
from collections import namedtuple
from collections import OrderedDict
from functools import partial
from copy import deepcopy
from functools import partial
from os import path
from distutils import spawn

import trio
from distutils import spawn

from . import command
from . import launch
from . import plugin
from . import utils
from . import launch

log = utils.get_logger()

Expand Down Expand Up @@ -72,11 +72,7 @@ def name(self):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

def run(
self,
timeout=180,
**kwargs
):
def run(self, timeout=180, **kwargs):

# create and configure a temp scenario
scen = plugin.mng.hook.pysipp_conf_scen_protocol(
Expand Down Expand Up @@ -472,20 +468,11 @@ async def arun(
timeout=timeout,
)

def run(
self,
timeout=180,
**kwargs
):
def run(self, timeout=180, **kwargs):
"""Run scenario blocking to completion."""
return trio.run(
partial(
self.arun,
timeout=timeout,
**kwargs
)
)
return trio.run(partial(self.arun, timeout=timeout, **kwargs))

def __call__(self, *args, **kwargs):
# TODO: deprecation warning here
kwargs.pop("block", None)
return self.run(*args, **kwargs)
47 changes: 19 additions & 28 deletions pysipp/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from functools import partial
from pprint import pformat

from . import utils

import trio

from . import report
from . import utils

log = utils.get_logger()

Expand All @@ -26,8 +25,7 @@ class TimeoutError(Exception):


class SIPpFailure(RuntimeError):
"""SIPp commands failed
"""
"""SIPp commands failed"""


class TrioRunner:
Expand All @@ -42,13 +40,7 @@ def __init__(
# store proc results
self._procs = OrderedDict()

async def run(
self,
nursery,
cmds,
rate=300,
**kwargs
):
async def run(self, nursery, cmds, rate=300, **kwargs):
if self.is_alive():
raise RuntimeError(
"Not all processes from a prior run have completed"
Expand All @@ -59,16 +51,15 @@ async def run(
)
# run agent commands in sequence
for cmd in cmds:
log.debug(
"launching cmd:\n\"{}\"\n".format(cmd)
)
log.debug('launching cmd:\n"{}"\n'.format(cmd))

proc = await nursery.start(
partial(
trio.run_process,
shlex.split(cmd),
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
check=False,
)
)
self._procs[cmd] = proc
Expand Down Expand Up @@ -125,15 +116,19 @@ async def wait_on_proc(proc):
# all procs were killed by SIGUSR1
raise TimeoutError(
"pids '{}' failed to complete after '{}' seconds".format(
pformat([p.pid for p in signalled.values()]), timeout)
pformat([p.pid for p in signalled.values()]), timeout
)
)

def iterprocs(self):
"""Iterate all processes which are still alive yielding
(cmd, proc) pairs
"""
return ((cmd, proc) for cmd, proc in self._procs.items()
if proc and proc.poll() is None)
return (
(cmd, proc)
for cmd, proc in self._procs.items()
if proc and proc.poll() is None
)

def stop(self):
"""Stop all agents with SIGUSR1 as per SIPp's signal handling"""
Expand All @@ -160,8 +155,7 @@ def is_alive(self):
return any(self.iterprocs())

def clear(self):
"""Clear all processes from the last run
"""
"""Clear all processes from the last run"""
assert not self.is_alive(), "Not all processes have completed"
self._procs.clear()

Expand All @@ -170,28 +164,25 @@ async def run_all_agents(
runner,
agents,
timeout=180,

) -> TrioRunner:
"""Run a sequencec of agents using a ``TrioRunner``.
"""
"""Run a sequencec of agents using a ``TrioRunner``."""

async def finalize():
# this might raise TimeoutError
cmds2procs = await runner.get(timeout=timeout)
agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
await report.emit_logfiles(agents2procs)
report.emit_logfiles(agents2procs)
raise SIPpFailure(msg)

return cmds2procs

try:
async with trio.open_nursery() as nurse:
await runner.run(
nurse,
(ua.render() for ua in agents),
timeout=timeout
nurse, (ua.render() for ua in agents), timeout=timeout
)
await finalize()
return runner
Expand All @@ -201,5 +192,5 @@ async def finalize():
try:
await finalize()
except SIPpFailure as err:
assert 'exit code -9' in str(err)
assert "exit code -9" in str(err)
raise terr
2 changes: 1 addition & 1 deletion tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_server():
# test client failure on bad remote destination
(agent.client(destaddr=("99.99.99.99", 5060)), 1, {}, RuntimeError),
# test if server times out it is signalled
(agent.server(), 0, {"timeout": 1}, launch.TimeoutError),
(agent.server(), -9, {"timeout": 1}, launch.TimeoutError),
],
ids=["ua", "uac", "uas"],
)
Expand Down