Skip to content

Commit

Permalink
#521: use contextlib.closing() - fixes all ResourceWarnings during tests
Browse files Browse the repository at this point in the history
  • Loading branch information
giampaolo committed Nov 2, 2014
1 parent a113e88 commit 7c629de
Showing 1 changed file with 91 additions and 108 deletions.
199 changes: 91 additions & 108 deletions test/test_psutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ast
import atexit
import collections
import contextlib
import datetime
import errno
import functools
Expand Down Expand Up @@ -312,31 +313,28 @@ def check_connection(conn):
# and that's rejected by bind()
if conn.family == AF_INET:
s = socket.socket(conn.family, conn.type)
try:
s.bind((conn.laddr[0], 0))
except socket.error as err:
if err.errno != errno.EADDRNOTAVAIL:
raise
s.close()
with contextlib.closing(s):
try:
s.bind((conn.laddr[0], 0))
except socket.error as err:
if err.errno != errno.EADDRNOTAVAIL:
raise
elif conn.family == AF_UNIX:
assert not conn.raddr, repr(conn.raddr)
assert conn.status == psutil.CONN_NONE, conn.status

if getattr(conn, 'fd', -1) != -1:
assert conn.fd > 0, conn
if hasattr(socket, 'fromfd') and not WINDOWS:
dupsock = None
try:
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
except (socket.error, OSError) as err:
if err.args[0] != errno.EBADF:
raise
else:
assert dupsock.family == conn.family
assert dupsock.type == conn.type
finally:
if dupsock is not None:
dupsock.close()
with contextlib.closing(dupsock):
assert dupsock.family == conn.family
assert dupsock.type == conn.type


def safe_remove(file):
Expand Down Expand Up @@ -587,12 +585,9 @@ def test_PAGESIZE(self):
self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())

def test_deprecated_apis(self):
s = socket.socket()
s.bind(('localhost', 0))
s.listen(1)
warnings.filterwarnings("error")
p = psutil.Process()
try:
with warnings.catch_warnings():
warnings.filterwarnings("error")
p = psutil.Process()
# system APIs
self.assertRaises(DeprecationWarning, getattr, psutil, 'NUM_CPUS')
self.assertRaises(DeprecationWarning, getattr, psutil, 'BOOT_TIME')
Expand Down Expand Up @@ -651,33 +646,29 @@ def test_deprecated_apis(self):
self.fail("%s did not raise DeprecationWarning" % name)

# named tuples
ret = call_until(p.connections, "len(ret) != 0")
self.assertRaises(DeprecationWarning,
getattr, ret[0], 'local_address')
self.assertRaises(DeprecationWarning,
getattr, ret[0], 'remote_address')
finally:
s.close()
warnings.resetwarnings()
with contextlib.closing(socket.socket()) as s:
s.bind(('localhost', 0))
s.listen(1)
ret = call_until(p.connections, "len(ret) != 0")
self.assertRaises(DeprecationWarning,
getattr, ret[0], 'local_address')
self.assertRaises(DeprecationWarning,
getattr, ret[0], 'remote_address')

# check value against new APIs
warnings.filterwarnings("ignore")
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
self.assertEqual(psutil.get_pid_list(), psutil.pids())
self.assertEqual(psutil.NUM_CPUS, psutil.cpu_count())
self.assertEqual(psutil.BOOT_TIME, psutil.boot_time())
self.assertEqual(psutil.TOTAL_PHYMEM,
psutil.virtual_memory().total)
finally:
warnings.resetwarnings()

def test_deprecated_apis_retval(self):
warnings.filterwarnings("ignore")
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
self.assertEqual(psutil.total_virtmem(),
psutil.swap_memory().total)
finally:
warnings.resetwarnings()

def test_virtual_memory(self):
mem = psutil.virtual_memory()
Expand Down Expand Up @@ -1281,12 +1272,11 @@ def test_io_counters(self):
assert io2.write_bytes >= io1.write_bytes, (io1, io2)
# test writes
io1 = p.io_counters()
f = tempfile.TemporaryFile(prefix=TESTFILE_PREFIX)
if PY3:
f.write(bytes("x" * 1000000, 'ascii'))
else:
f.write("x" * 1000000)
f.close()
with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f:
if PY3:
f.write(bytes("x" * 1000000, 'ascii'))
else:
f.write("x" * 1000000)
io2 = p.io_counters()
assert io2.write_count >= io1.write_count, (io1, io2)
assert io2.write_bytes >= io1.write_bytes, (io1, io2)
Expand Down Expand Up @@ -1739,22 +1729,21 @@ def test_connections(self):

@unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported')
def test_connections_ipv6(self):
s = socket.socket(AF_INET6, SOCK_STREAM)
self.addCleanup(s.close)
s.bind(('::1', 0))
s.listen(1)
cons = psutil.Process().connections()
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].laddr[0], '::1')
self.compare_proc_sys_cons(os.getpid(), cons)
with contextlib.closing(socket.socket(AF_INET6, SOCK_STREAM)) as s:
s.bind(('::1', 0))
s.listen(1)
cons = psutil.Process().connections()
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].laddr[0], '::1')
self.compare_proc_sys_cons(os.getpid(), cons)

@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
'AF_UNIX is not supported')
def test_connections_unix(self):
def check(type):
safe_remove(TESTFN)
sock = socket.socket(AF_UNIX, type)
try:
with contextlib.closing(sock):
sock.bind(TESTFN)
cons = psutil.Process().connections(kind='unix')
conn = cons[0]
Expand All @@ -1768,8 +1757,6 @@ def check(type):
# XXX Solaris can't retrieve system-wide UNIX
# sockets.
self.compare_proc_sys_cons(os.getpid(), cons)
finally:
sock.close()

check(SOCK_STREAM)
check(SOCK_DGRAM)
Expand All @@ -1779,23 +1766,19 @@ def check(type):
@unittest.skipIf(WINDOWS or SUNOS,
'connection fd not available on this platform')
def test_connection_fromfd(self):
sock = socket.socket()
sock.bind(('localhost', 0))
sock.listen(1)
p = psutil.Process()
for conn in p.connections():
if conn.fd == sock.fileno():
break
else:
sock.close()
self.fail("couldn't find socket fd")
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
try:
self.assertEqual(dupsock.getsockname(), conn.laddr)
self.assertNotEqual(sock.fileno(), dupsock.fileno())
finally:
sock.close()
dupsock.close()
with contextlib.closing(socket.socket()) as sock:
sock.bind(('localhost', 0))
sock.listen(1)
p = psutil.Process()
for conn in p.connections():
if conn.fd == sock.fileno():
break
else:
self.fail("couldn't find socket fd")
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
with contextlib.closing(dupsock):
self.assertEqual(dupsock.getsockname(), conn.laddr)
self.assertNotEqual(sock.fileno(), dupsock.fileno())

def test_connections_all(self):
tcp_template = textwrap.dedent("""
Expand Down Expand Up @@ -1880,8 +1863,10 @@ def test_num_fds(self):
p = psutil.Process()
start = p.num_fds()
file = open(TESTFN, 'w')
self.addCleanup(file.close)
self.assertEqual(p.num_fds(), start + 1)
sock = socket.socket()
self.addCleanup(sock.close)
self.assertEqual(p.num_fds(), start + 2)
file.close()
sock.close()
Expand Down Expand Up @@ -2044,53 +2029,51 @@ def test_zombie_process(self):
# reap_children() as they are attributable to 'us'
# (os.getpid()) via children(recursive=True).
src = textwrap.dedent("""\
import os, sys, time, socket
import os, sys, time, socket, contextlib
child_pid = os.fork()
if child_pid > 0:
time.sleep(3000)
else:
# this is the zombie process
s = socket.socket(socket.AF_UNIX)
s.connect('%s')
if sys.version_info < (3, ):
pid = str(os.getpid())
else:
pid = bytes(str(os.getpid()), 'ascii')
s.sendall(pid)
s.close()
with contextlib.closing(s):
s.connect('%s')
if sys.version_info < (3, ):
pid = str(os.getpid())
else:
pid = bytes(str(os.getpid()), 'ascii')
s.sendall(pid)
""" % TESTFN)
sock = None
try:
sock = socket.socket(socket.AF_UNIX)
sock.settimeout(GLOBAL_TIMEOUT)
sock.bind(TESTFN)
sock.listen(1)
pyrun(src)
conn, _ = sock.accept()
select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
zpid = int(conn.recv(1024))
zproc = psutil.Process(zpid)
call_until(lambda: zproc.status(), "ret == psutil.STATUS_ZOMBIE")
# A zombie process should always be instantiable
zproc = psutil.Process(zpid)
# ...and at least its status always be querable
self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
# ...its parent should 'see' it
descendants = [x.pid for x in psutil.Process().children(
recursive=True)]
self.assertIn(zpid, descendants)
# XXX should we also assume ppid be usable? Note: this
# would be an important use case as the only way to get
# rid of a zombie is to kill its parent.
# self.assertEqual(zpid.ppid(), os.getpid())
# ...and all other APIs should be able to deal with it
self.assertTrue(psutil.pid_exists(zpid))
self.assertIn(zpid, psutil.pids())
self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
finally:
if sock is not None:
sock.close()
reap_children(search_all=True)
with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
try:
sock.settimeout(GLOBAL_TIMEOUT)
sock.bind(TESTFN)
sock.listen(1)
pyrun(src)
conn, _ = sock.accept()
select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
zpid = int(conn.recv(1024))
zproc = psutil.Process(zpid)
call_until(lambda: zproc.status(),
"ret == psutil.STATUS_ZOMBIE")
# A zombie process should always be instantiable
zproc = psutil.Process(zpid)
# ...and at least its status always be querable
self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
# ...its parent should 'see' it
descendants = [x.pid for x in psutil.Process().children(
recursive=True)]
self.assertIn(zpid, descendants)
# XXX should we also assume ppid be usable? Note: this
# would be an important use case as the only way to get
# rid of a zombie is to kill its parent.
# self.assertEqual(zpid.ppid(), os.getpid())
# ...and all other APIs should be able to deal with it
self.assertTrue(psutil.pid_exists(zpid))
self.assertIn(zpid, psutil.pids())
self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
finally:
reap_children(search_all=True)

def test_pid_0(self):
# Process(0) is supposed to work on all platforms except Linux
Expand Down

0 comments on commit 7c629de

Please sign in to comment.