Skip to content

Commit accc1ca

Browse files
feat: Record source performance counters in uproot.dask report (#1156)
* Record source performance counters in uproot.dask report * style: pre-commit fixes * Fix lint * style: pre-commit fixes --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent a3e99d6 commit accc1ca

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

src/uproot/_dask.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import time
77
from collections.abc import Callable, Iterable, Mapping
88

9+
from uproot.source.chunk import SourcePerformanceCounters
10+
911
try:
1012
from typing import TYPE_CHECKING, Final
1113

@@ -972,7 +974,9 @@ def allowed_exceptions(self):
972974
def return_report(self) -> bool:
973975
return bool(self.allow_read_errors_with_report)
974976

975-
def read_tree(self, tree: HasBranches, start: int, stop: int) -> AwkArray:
977+
def read_tree(
978+
self, tree: HasBranches, start: int, stop: int
979+
) -> tuple[AwkArray, SourcePerformanceCounters]:
976980
assert start <= stop
977981

978982
from awkward._nplikes.numpy import Numpy
@@ -1017,13 +1021,15 @@ def read_tree(self, tree: HasBranches, start: int, stop: int) -> AwkArray:
10171021
dtype=dtype,
10181022
)
10191023

1020-
return awkward.from_buffers(
1024+
out = awkward.from_buffers(
10211025
self.expected_form,
10221026
stop - start,
10231027
container,
10241028
behavior=self.form_mapping_info.behavior,
10251029
buffer_key=self.form_mapping_info.buffer_key,
10261030
)
1031+
assert tree.source # we must be reading something here
1032+
return out, tree.source.performance_counters
10271033

10281034
def mock(self) -> AwkArray:
10291035
awkward = uproot.extras.awkward()
@@ -1114,6 +1120,7 @@ def _report_failure(exception, call_time, *args, **kwargs):
11141120
{
11151121
"call_time": call_time,
11161122
"duration": None,
1123+
"performance_counters": None,
11171124
"args": [repr(a) for a in args],
11181125
"kwargs": [[k, repr(v)] for k, v in kwargs.items()],
11191126
"exception": type(exception).__name__,
@@ -1127,11 +1134,13 @@ def _report_failure(exception, call_time, *args, **kwargs):
11271134

11281135
def _report_success(duration, *args, **kwargs):
11291136
awkward = uproot.extras.awkward()
1137+
counters = kwargs.pop("counters")
11301138
return awkward.Array(
11311139
[
11321140
{
11331141
"call_time": None,
11341142
"duration": duration,
1143+
"performance_counters": counters.asdict(),
11351144
"args": [repr(a) for a in args],
11361145
"kwargs": [[k, repr(v)] for k, v in kwargs.items()],
11371146
"exception": None,
@@ -1195,14 +1204,17 @@ def __call__(self, i_start_stop):
11951204
if self.return_report:
11961205
call_time = time.time_ns()
11971206
try:
1198-
result, duration = with_duration(self._call_impl)(i, start, stop)
1207+
(result, counters), duration = with_duration(self._call_impl)(
1208+
i, start, stop
1209+
)
11991210
return (
12001211
result,
12011212
_report_success(
12021213
duration,
12031214
self.ttrees[i],
12041215
start,
12051216
stop,
1217+
counters=counters,
12061218
),
12071219
)
12081220
except self.allowed_exceptions as err:
@@ -1217,7 +1229,8 @@ def __call__(self, i_start_stop):
12171229
),
12181230
)
12191231

1220-
return self._call_impl(i, start, stop)
1232+
result, _ = self._call_impl(i, start, stop)
1233+
return result
12211234

12221235
def _call_impl(self, i, start, stop):
12231236
return self.read_tree(
@@ -1305,7 +1318,7 @@ def __call__(self, blockwise_args):
13051318
if self.return_report:
13061319
call_time = time.time_ns()
13071320
try:
1308-
result, duration = with_duration(self._call_impl)(
1321+
(result, counters), duration = with_duration(self._call_impl)(
13091322
file_path, object_path, i_step_or_start, n_steps_or_stop, is_chunk
13101323
)
13111324
return (
@@ -1317,6 +1330,7 @@ def __call__(self, blockwise_args):
13171330
i_step_or_start,
13181331
n_steps_or_stop,
13191332
is_chunk,
1333+
counters=counters,
13201334
),
13211335
)
13221336
except self.allowed_exceptions as err:
@@ -1333,9 +1347,10 @@ def __call__(self, blockwise_args):
13331347
),
13341348
)
13351349

1336-
return self._call_impl(
1350+
result, _ = self._call_impl(
13371351
file_path, object_path, i_step_or_start, n_steps_or_stop, is_chunk
13381352
)
1353+
return result
13391354

13401355
def project_keys(self: T, keys: frozenset[str]) -> T:
13411356
return _UprootOpenAndRead(

src/uproot/behaviors/TBranch.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import uproot
2424
import uproot.interpretation.grouped
2525
import uproot.language.python
26+
import uproot.source.chunk
2627
from uproot._util import no_filter
2728

2829
np_uint8 = numpy.dtype("u1")
@@ -1664,6 +1665,18 @@ def __iter__(self):
16641665
def __len__(self):
16651666
return len(self.branches)
16661667

1668+
@property
1669+
def source(self) -> uproot.source.chunk.Source | None:
1670+
"""Returns the associated source of data for this container, if it exists
1671+
1672+
Returns: uproot.source.chunk.Source or None
1673+
"""
1674+
if isinstance(self, uproot.model.Model) and isinstance(
1675+
self._file, uproot.reading.ReadOnlyFile
1676+
):
1677+
return self._file.source
1678+
return None
1679+
16671680

16681681
_branch_clean_name = re.compile(r"(.*\.)*([^\.\[\]]*)(\[.*\])*")
16691682
_branch_clean_parent_name = re.compile(r"(.*\.)*([^\.\[\]]*)\.([^\.\[\]]*)(\[.*\])*")

src/uproot/source/chunk.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from __future__ import annotations
1414

15+
import dataclasses
1516
import numbers
1617
import queue
1718

@@ -41,6 +42,18 @@ def file_path(self) -> str:
4142
return self._file_path
4243

4344

45+
@dataclasses.dataclass
46+
class SourcePerformanceCounters:
47+
"""Container for performance counters"""
48+
49+
num_requested_bytes: int
50+
num_requests: int
51+
num_requested_chunks: int
52+
53+
def asdict(self) -> dict[str, int]:
54+
return dataclasses.asdict(self)
55+
56+
4457
class Source:
4558
"""
4659
Abstract class for physically reading and writing data from a file, which
@@ -138,6 +151,14 @@ def num_requested_bytes(self) -> int:
138151
"""
139152
return self._num_requested_bytes
140153

154+
@property
155+
def performance_counters(self) -> SourcePerformanceCounters:
156+
return SourcePerformanceCounters(
157+
self._num_requested_bytes,
158+
self._num_requests,
159+
self._num_requested_chunks,
160+
)
161+
141162
def close(self):
142163
"""
143164
Manually closes the file(s) and stops any running threads.

0 commit comments

Comments
 (0)