Skip to content

Commit d3f4f2b

Browse files
committed
tests: create loop device for block helper tests
The upcoming vmtest rework won't have any block devices, so let's add a loop device so that we always have a device to test with.
1 parent 2a0bbe7 commit d3f4f2b

File tree

1 file changed

+84
-4
lines changed

1 file changed

+84
-4
lines changed

tests/helpers/linux/test_block.py

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
import errno
2+
from fcntl import ioctl
13
import os
24
import os.path
5+
import sys
6+
import tempfile
37

48
from drgn.helpers.linux.block import (
59
disk_devt,
@@ -9,11 +13,63 @@
913
part_devt,
1014
part_name,
1115
)
12-
from drgn.helpers.linux.device import MAJOR, MINOR
16+
from drgn.helpers.linux.device import MAJOR, MINOR, MKDEV
1317
from tests.helpers.linux import LinuxHelperTestCase
1418

1519

20+
LOOP_SET_FD = 0x4C00
21+
LOOP_SET_STATUS64 = 0x4C04
22+
LOOP_GET_STATUS64 = 0x4C05
23+
LOOP_CTL_GET_FREE = 0x4C82
24+
25+
LO_FLAGS_AUTOCLEAR = 4
26+
27+
1628
class TestBlock(LinuxHelperTestCase):
29+
@staticmethod
30+
def _losetup():
31+
with tempfile.TemporaryFile() as temp:
32+
os.truncate(temp.fileno(), 1024 * 1024 * 1024)
33+
with open("/dev/loop-control", "r") as loop_control:
34+
while True:
35+
index = ioctl(loop_control.fileno(), LOOP_CTL_GET_FREE)
36+
close_loop = True
37+
loop = open(f"/dev/loop{index}", "r")
38+
try:
39+
try:
40+
ioctl(loop.fileno(), LOOP_SET_FD, temp.fileno())
41+
except OSError as e:
42+
if e.errno == errno.EBUSY:
43+
continue
44+
raise
45+
info = bytearray(232) # sizeof(struct loop_info64)
46+
ioctl(loop.fileno(), LOOP_GET_STATUS64, info)
47+
lo_flags = int.from_bytes(info[52:56], sys.byteorder)
48+
lo_flags |= LO_FLAGS_AUTOCLEAR
49+
info[52:56] = lo_flags.to_bytes(4, sys.byteorder)
50+
ioctl(loop.fileno(), LOOP_SET_STATUS64, info, False)
51+
close_loop = False
52+
return loop
53+
finally:
54+
if close_loop:
55+
loop.close()
56+
57+
@classmethod
58+
def setUpClass(cls):
59+
super().setUpClass()
60+
# Try to set up a loop device so that there's at least one block
61+
# device.
62+
try:
63+
cls.loop = cls._losetup()
64+
except OSError:
65+
cls.loop = None
66+
67+
@classmethod
68+
def tearDownClass(cls):
69+
if cls.loop:
70+
cls.loop.close()
71+
super().tearDownClass()
72+
1773
def test_disk_devt(self):
1874
for disk in for_each_disk(self.prog):
1975
path = os.path.join(b"/sys/block", disk_name(disk), b"dev")
@@ -24,8 +80,8 @@ def test_disk_devt(self):
2480

2581
def test_for_each_disk(self):
2682
self.assertEqual(
27-
set(os.listdir("/sys/block")),
2883
{disk_name(disk).decode() for disk in for_each_disk(self.prog)},
84+
set(os.listdir("/sys/block")),
2985
)
3086

3187
def test_part_devt(self):
@@ -36,8 +92,32 @@ def test_part_devt(self):
3692
devt = part_devt(part).value_()
3793
self.assertEqual(f"{MAJOR(devt)}:{MINOR(devt)}", expected)
3894

39-
def test_for_each_part(self):
95+
def test_for_each_partition(self):
4096
self.assertEqual(
41-
set(os.listdir("/sys/class/block")),
4297
{part_name(part).decode() for part in for_each_partition(self.prog)},
98+
set(os.listdir("/sys/class/block")),
4399
)
100+
101+
def test_loop_disk(self):
102+
if not self.loop:
103+
self.skipTest("could not create loop device")
104+
rdev = os.stat(self.loop.fileno()).st_rdev
105+
devt = MKDEV(os.major(rdev), os.minor(rdev))
106+
for disk in for_each_disk(self.prog):
107+
if disk_devt(disk) == devt:
108+
break
109+
else:
110+
self.fail("loop disk not found")
111+
self.assertEqual(disk_name(disk), os.path.basename(self.loop.name).encode())
112+
113+
def test_loop_part(self):
114+
if not self.loop:
115+
self.skipTest("could not create loop device")
116+
rdev = os.stat(self.loop.fileno()).st_rdev
117+
devt = MKDEV(os.major(rdev), os.minor(rdev))
118+
for part in for_each_partition(self.prog):
119+
if part_devt(part) == devt:
120+
break
121+
else:
122+
self.fail("loop partition not found")
123+
self.assertEqual(part_name(part), os.path.basename(self.loop.name).encode())

0 commit comments

Comments
 (0)