1+ import errno
2+ from fcntl import ioctl
13import os
24import os .path
5+ import sys
6+ import tempfile
37
48from drgn .helpers .linux .block import (
59 disk_devt ,
913 part_devt ,
1014 part_name ,
1115)
12- from drgn .helpers .linux .device import MAJOR , MINOR
16+ from drgn .helpers .linux .device import MAJOR , MINOR , MKDEV
1317from tests .helpers .linux import LinuxHelperTestCase
1418
1519
20+ LOOP_SET_FD = 0x4C00
21+ LOOP_SET_STATUS64 = 0x4C04
22+ LOOP_GET_STATUS64 = 0x4C05
23+ LOOP_CTL_GET_FREE = 0x4C82
24+
25+ LO_FLAGS_AUTOCLEAR = 4
26+
27+
1628class TestBlock (LinuxHelperTestCase ):
29+ @staticmethod
30+ def _losetup ():
31+ with tempfile .TemporaryFile () as temp :
32+ os .truncate (temp .fileno (), 1024 * 1024 * 1024 )
33+ with open ("/dev/loop-control" , "r" ) as loop_control :
34+ while True :
35+ index = ioctl (loop_control .fileno (), LOOP_CTL_GET_FREE )
36+ close_loop = True
37+ loop = open (f"/dev/loop{ index } " , "r" )
38+ try :
39+ try :
40+ ioctl (loop .fileno (), LOOP_SET_FD , temp .fileno ())
41+ except OSError as e :
42+ if e .errno == errno .EBUSY :
43+ continue
44+ raise
45+ info = bytearray (232 ) # sizeof(struct loop_info64)
46+ ioctl (loop .fileno (), LOOP_GET_STATUS64 , info )
47+ lo_flags = int .from_bytes (info [52 :56 ], sys .byteorder )
48+ lo_flags |= LO_FLAGS_AUTOCLEAR
49+ info [52 :56 ] = lo_flags .to_bytes (4 , sys .byteorder )
50+ ioctl (loop .fileno (), LOOP_SET_STATUS64 , info , False )
51+ close_loop = False
52+ return loop
53+ finally :
54+ if close_loop :
55+ loop .close ()
56+
57+ @classmethod
58+ def setUpClass (cls ):
59+ super ().setUpClass ()
60+ # Try to set up a loop device so that there's at least one block
61+ # device.
62+ try :
63+ cls .loop = cls ._losetup ()
64+ except OSError :
65+ cls .loop = None
66+
67+ @classmethod
68+ def tearDownClass (cls ):
69+ if cls .loop :
70+ cls .loop .close ()
71+ super ().tearDownClass ()
72+
1773 def test_disk_devt (self ):
1874 for disk in for_each_disk (self .prog ):
1975 path = os .path .join (b"/sys/block" , disk_name (disk ), b"dev" )
@@ -24,8 +80,8 @@ def test_disk_devt(self):
2480
2581 def test_for_each_disk (self ):
2682 self .assertEqual (
27- set (os .listdir ("/sys/block" )),
2883 {disk_name (disk ).decode () for disk in for_each_disk (self .prog )},
84+ set (os .listdir ("/sys/block" )),
2985 )
3086
3187 def test_part_devt (self ):
@@ -36,8 +92,32 @@ def test_part_devt(self):
3692 devt = part_devt (part ).value_ ()
3793 self .assertEqual (f"{ MAJOR (devt )} :{ MINOR (devt )} " , expected )
3894
39- def test_for_each_part (self ):
95+ def test_for_each_partition (self ):
4096 self .assertEqual (
41- set (os .listdir ("/sys/class/block" )),
4297 {part_name (part ).decode () for part in for_each_partition (self .prog )},
98+ set (os .listdir ("/sys/class/block" )),
4399 )
100+
101+ def test_loop_disk (self ):
102+ if not self .loop :
103+ self .skipTest ("could not create loop device" )
104+ rdev = os .stat (self .loop .fileno ()).st_rdev
105+ devt = MKDEV (os .major (rdev ), os .minor (rdev ))
106+ for disk in for_each_disk (self .prog ):
107+ if disk_devt (disk ) == devt :
108+ break
109+ else :
110+ self .fail ("loop disk not found" )
111+ self .assertEqual (disk_name (disk ), os .path .basename (self .loop .name ).encode ())
112+
113+ def test_loop_part (self ):
114+ if not self .loop :
115+ self .skipTest ("could not create loop device" )
116+ rdev = os .stat (self .loop .fileno ()).st_rdev
117+ devt = MKDEV (os .major (rdev ), os .minor (rdev ))
118+ for part in for_each_partition (self .prog ):
119+ if part_devt (part ) == devt :
120+ break
121+ else :
122+ self .fail ("loop partition not found" )
123+ self .assertEqual (part_name (part ), os .path .basename (self .loop .name ).encode ())
0 commit comments