|
1 |
| -import backend as F |
| 1 | +import os |
| 2 | +import tempfile |
| 3 | +import unittest |
2 | 4 |
|
| 5 | +import backend as F |
| 6 | +import numpy as np |
3 | 7 | import pytest
|
4 | 8 | import torch
|
5 | 9 |
|
6 | 10 | from dgl import graphbolt as gb
|
7 | 11 |
|
8 | 12 |
|
| 13 | +def to_on_disk_numpy(test_dir, name, t): |
| 14 | + path = os.path.join(test_dir, name + ".npy") |
| 15 | + np.save(path, t.numpy()) |
| 16 | + return path |
| 17 | + |
| 18 | + |
9 | 19 | @pytest.mark.parametrize(
|
10 | 20 | "dtype",
|
11 | 21 | [
|
@@ -93,3 +103,78 @@ def test_cpu_cached_feature(dtype, policy):
|
93 | 103 | # Test with different dimensionality
|
94 | 104 | feat_store_a.update(b)
|
95 | 105 | assert torch.equal(feat_store_a.read(), b)
|
| 106 | + |
| 107 | + |
| 108 | +@pytest.mark.parametrize( |
| 109 | + "dtype", |
| 110 | + [ |
| 111 | + torch.bool, |
| 112 | + torch.uint8, |
| 113 | + torch.int8, |
| 114 | + torch.int16, |
| 115 | + torch.int32, |
| 116 | + torch.int64, |
| 117 | + torch.float16, |
| 118 | + torch.bfloat16, |
| 119 | + torch.float32, |
| 120 | + torch.float64, |
| 121 | + ], |
| 122 | +) |
| 123 | +def test_cpu_cached_feature_read_async(dtype): |
| 124 | + a = torch.randint(0, 2, [1000, 13], dtype=dtype) |
| 125 | + |
| 126 | + cache_size = 256 * a[:1].nbytes |
| 127 | + |
| 128 | + feat_store = gb.CPUCachedFeature(gb.TorchBasedFeature(a), cache_size) |
| 129 | + |
| 130 | + # Test read with ids. |
| 131 | + ids1 = torch.tensor([0, 15, 71, 101]) |
| 132 | + ids2 = torch.tensor([71, 101, 202, 303]) |
| 133 | + for ids in [ids1, ids2]: |
| 134 | + reader = feat_store.read_async(ids) |
| 135 | + for _ in range(feat_store.read_async_num_stages(ids.device)): |
| 136 | + values = next(reader) |
| 137 | + assert torch.equal(values.wait(), a[ids]) |
| 138 | + |
| 139 | + |
| 140 | +@unittest.skipIf( |
| 141 | + not torch.ops.graphbolt.detect_io_uring(), |
| 142 | + reason="DiskBasedFeature is not available on this system.", |
| 143 | +) |
| 144 | +@pytest.mark.parametrize( |
| 145 | + "dtype", |
| 146 | + [ |
| 147 | + torch.bool, |
| 148 | + torch.uint8, |
| 149 | + torch.int8, |
| 150 | + torch.int16, |
| 151 | + torch.int32, |
| 152 | + torch.int64, |
| 153 | + torch.float16, |
| 154 | + torch.float32, |
| 155 | + torch.float64, |
| 156 | + ], |
| 157 | +) |
| 158 | +def test_cpu_cached_disk_feature_read_async(dtype): |
| 159 | + a = torch.randint(0, 2, [1000, 13], dtype=dtype) |
| 160 | + |
| 161 | + cache_size = 256 * a[:1].nbytes |
| 162 | + |
| 163 | + ids1 = torch.tensor([0, 15, 71, 101]) |
| 164 | + ids2 = torch.tensor([71, 101, 202, 303]) |
| 165 | + |
| 166 | + with tempfile.TemporaryDirectory() as test_dir: |
| 167 | + path = to_on_disk_numpy(test_dir, "tensor", a) |
| 168 | + |
| 169 | + feat_store = gb.CPUCachedFeature( |
| 170 | + gb.DiskBasedFeature(path=path), cache_size |
| 171 | + ) |
| 172 | + |
| 173 | + # Test read feature. |
| 174 | + for ids in [ids1, ids2]: |
| 175 | + reader = feat_store.read_async(ids) |
| 176 | + for _ in range(feat_store.read_async_num_stages(ids.device)): |
| 177 | + values = next(reader) |
| 178 | + assert torch.equal(values.wait(), a[ids]) |
| 179 | + |
| 180 | + feat_store = None |
0 commit comments