Skip to content

Commit

Permalink
Merge pull request #88 from linuxmaniac/vseva/trio
Browse files Browse the repository at this point in the history
trio subprocs: small fixes for #87
  • Loading branch information
goodboy authored Dec 20, 2022
2 parents 57ffd9e + bb22d6f commit 6f524e4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 49 deletions.
27 changes: 7 additions & 20 deletions pysipp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
import tempfile
from collections import namedtuple
from collections import OrderedDict
from functools import partial
from copy import deepcopy
from functools import partial
from os import path
from distutils import spawn

import trio
from distutils import spawn

from . import command
from . import launch
from . import plugin
from . import utils
from . import launch

log = utils.get_logger()

Expand Down Expand Up @@ -72,11 +72,7 @@ def name(self):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)

def run(
self,
timeout=180,
**kwargs
):
def run(self, timeout=180, **kwargs):

# create and configure a temp scenario
scen = plugin.mng.hook.pysipp_conf_scen_protocol(
Expand Down Expand Up @@ -472,20 +468,11 @@ async def arun(
timeout=timeout,
)

def run(
self,
timeout=180,
**kwargs
):
def run(self, timeout=180, **kwargs):
"""Run scenario blocking to completion."""
return trio.run(
partial(
self.arun,
timeout=timeout,
**kwargs
)
)
return trio.run(partial(self.arun, timeout=timeout, **kwargs))

def __call__(self, *args, **kwargs):
# TODO: deprecation warning here
kwargs.pop("block", None)
return self.run(*args, **kwargs)
47 changes: 19 additions & 28 deletions pysipp/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from functools import partial
from pprint import pformat

from . import utils

import trio

from . import report
from . import utils

log = utils.get_logger()

Expand All @@ -26,8 +25,7 @@ class TimeoutError(Exception):


class SIPpFailure(RuntimeError):
"""SIPp commands failed
"""
"""SIPp commands failed"""


class TrioRunner:
Expand All @@ -42,13 +40,7 @@ def __init__(
# store proc results
self._procs = OrderedDict()

async def run(
self,
nursery,
cmds,
rate=300,
**kwargs
):
async def run(self, nursery, cmds, rate=300, **kwargs):
if self.is_alive():
raise RuntimeError(
"Not all processes from a prior run have completed"
Expand All @@ -59,16 +51,15 @@ async def run(
)
# run agent commands in sequence
for cmd in cmds:
log.debug(
"launching cmd:\n\"{}\"\n".format(cmd)
)
log.debug('launching cmd:\n"{}"\n'.format(cmd))

proc = await nursery.start(
partial(
trio.run_process,
shlex.split(cmd),
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
check=False,
)
)
self._procs[cmd] = proc
Expand Down Expand Up @@ -125,15 +116,19 @@ async def wait_on_proc(proc):
# all procs were killed by SIGUSR1
raise TimeoutError(
"pids '{}' failed to complete after '{}' seconds".format(
pformat([p.pid for p in signalled.values()]), timeout)
pformat([p.pid for p in signalled.values()]), timeout
)
)

def iterprocs(self):
"""Iterate all processes which are still alive yielding
(cmd, proc) pairs
"""
return ((cmd, proc) for cmd, proc in self._procs.items()
if proc and proc.poll() is None)
return (
(cmd, proc)
for cmd, proc in self._procs.items()
if proc and proc.poll() is None
)

def stop(self):
"""Stop all agents with SIGUSR1 as per SIPp's signal handling"""
Expand All @@ -160,8 +155,7 @@ def is_alive(self):
return any(self.iterprocs())

def clear(self):
"""Clear all processes from the last run
"""
"""Clear all processes from the last run"""
assert not self.is_alive(), "Not all processes have completed"
self._procs.clear()

Expand All @@ -170,28 +164,25 @@ async def run_all_agents(
runner,
agents,
timeout=180,

) -> TrioRunner:
"""Run a sequencec of agents using a ``TrioRunner``.
"""
"""Run a sequencec of agents using a ``TrioRunner``."""

async def finalize():
# this might raise TimeoutError
cmds2procs = await runner.get(timeout=timeout)
agents2procs = list(zip(agents, cmds2procs.values()))
msg = report.err_summary(agents2procs)
if msg:
# report logs and stderr
await report.emit_logfiles(agents2procs)
report.emit_logfiles(agents2procs)
raise SIPpFailure(msg)

return cmds2procs

try:
async with trio.open_nursery() as nurse:
await runner.run(
nurse,
(ua.render() for ua in agents),
timeout=timeout
nurse, (ua.render() for ua in agents), timeout=timeout
)
await finalize()
return runner
Expand All @@ -201,5 +192,5 @@ async def finalize():
try:
await finalize()
except SIPpFailure as err:
assert 'exit code -9' in str(err)
assert "exit code -9" in str(err)
raise terr
2 changes: 1 addition & 1 deletion tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_server():
# test client failure on bad remote destination
(agent.client(destaddr=("99.99.99.99", 5060)), 1, {}, RuntimeError),
# test if server times out it is signalled
(agent.server(), 0, {"timeout": 1}, launch.TimeoutError),
(agent.server(), -9, {"timeout": 1}, launch.TimeoutError),
],
ids=["ua", "uac", "uas"],
)
Expand Down

0 comments on commit 6f524e4

Please sign in to comment.