|
| 1 | +import contextlib |
| 2 | +import ctypes |
| 3 | +import mmap |
| 4 | +import os |
1 | 5 | import platform |
2 | 6 | import re |
| 7 | +import struct |
| 8 | +import tempfile |
3 | 9 | import unittest |
4 | 10 |
|
5 | | -from drgn.helpers.linux.mm import pgtable_l5_enabled |
6 | | -from tests.helpers.linux import LinuxHelperTestCase |
| 11 | +from drgn.helpers.linux.mm import ( |
| 12 | + page_to_pfn, |
| 13 | + pfn_to_page, |
| 14 | + pfn_to_virt, |
| 15 | + pgtable_l5_enabled, |
| 16 | + virt_to_pfn, |
| 17 | +) |
| 18 | +from tests.helpers.linux import LinuxHelperTestCase, mlock |
7 | 19 |
|
8 | 20 |
|
9 | 21 | class TestMm(LinuxHelperTestCase): |
| 22 | + def test_page_constants(self): |
| 23 | + self.assertEqual(self.prog["PAGE_SIZE"], mmap.PAGESIZE) |
| 24 | + self.assertEqual(1 << self.prog["PAGE_SHIFT"], mmap.PAGESIZE) |
| 25 | + self.assertEqual(~self.prog["PAGE_MASK"] + 1, mmap.PAGESIZE) |
| 26 | + |
| 27 | + # Returns an mmap.mmap object for a file mapping and the pfns backing it. |
| 28 | + @contextlib.contextmanager |
| 29 | + def _pages(self): |
| 30 | + if not os.path.exists("/proc/self/pagemap"): |
| 31 | + self.skipTest("kernel does not support pagemap") |
| 32 | + |
| 33 | + pages = 4 |
| 34 | + with tempfile.TemporaryFile() as f: |
| 35 | + f.write(os.urandom(pages * mmap.PAGESIZE)) |
| 36 | + f.flush() |
| 37 | + with mmap.mmap(f.fileno(), pages * mmap.PAGESIZE) as map: |
| 38 | + f.close() |
| 39 | + address = ctypes.addressof(ctypes.c_char.from_buffer(map)) |
| 40 | + # Make sure the pages are faulted in and stay that way. |
| 41 | + mlock(address, pages * mmap.PAGESIZE) |
| 42 | + |
| 43 | + with open("/proc/self/pagemap", "rb", buffering=0) as pagemap: |
| 44 | + pagemap.seek(address // mmap.PAGESIZE * 8) |
| 45 | + pfns = [ |
| 46 | + entry & ((1 << 54) - 1) |
| 47 | + for entry in struct.unpack(f"{pages}Q", pagemap.read(pages * 8)) |
| 48 | + ] |
| 49 | + yield map, pfns |
| 50 | + |
| 51 | + def test_virt_to_from_pfn(self): |
| 52 | + with self._pages() as (map, pfns): |
| 53 | + for i, pfn in enumerate(pfns): |
| 54 | + virt = pfn_to_virt(self.prog, pfn) |
| 55 | + # Test that we got the correct virtual address by reading from |
| 56 | + # it and comparing it to the mmap. |
| 57 | + self.assertEqual( |
| 58 | + self.prog.read(virt, mmap.PAGESIZE), |
| 59 | + map[i * mmap.PAGESIZE : (i + 1) * mmap.PAGESIZE], |
| 60 | + ) |
| 61 | + # Test the opposite direction. |
| 62 | + self.assertEqual(virt_to_pfn(virt), pfn) |
| 63 | + |
| 64 | + def test_pfn_to_from_page(self): |
| 65 | + with self._pages() as (map, pfns): |
| 66 | + for i, pfn in enumerate(pfns): |
| 67 | + page = pfn_to_page(self.prog, pfn) |
| 68 | + # Test that we got the correct page by looking at the index: it |
| 69 | + # should be page i in the file. |
| 70 | + self.assertEqual(page.index, i) |
| 71 | + # Test the opposite direction. |
| 72 | + self.assertEqual(page_to_pfn(page), pfn) |
| 73 | + |
10 | 74 | @unittest.skipUnless(platform.machine() == "x86_64", "machine is not x86_64") |
11 | 75 | def test_pgtable_l5_enabled(self): |
12 | 76 | with open("/proc/cpuinfo", "r") as f: |
|
0 commit comments