-
Notifications
You must be signed in to change notification settings - Fork 393
/
Copy pathcustom_data.py
executable file
·75 lines (54 loc) · 2.23 KB
/
custom_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Shows how to implement custom archetypes and components."""
from __future__ import annotations
import argparse
from typing import Any
import numpy as np
import numpy.typing as npt
import pyarrow as pa
import rerun as rr
class ConfidenceBatch(rr.ComponentBatchMixin):
"""A batch of confidence data."""
def __init__(self: Any, confidence: npt.ArrayLike) -> None:
self.confidence = confidence
def component_descriptor(self) -> rr.ComponentDescriptor:
"""The descriptor of the custom component."""
return rr.ComponentDescriptor("user.Confidence")
def as_arrow_array(self) -> pa.Array:
"""The arrow batch representing the custom component."""
return pa.array(self.confidence, type=pa.float32())
class CustomPoints3D(rr.AsComponents):
"""A custom archetype that extends Rerun's builtin `Points3D` archetype with a custom component."""
def __init__(self: Any, positions: npt.ArrayLike, confidences: npt.ArrayLike) -> None:
self.points3d = rr.Points3D(positions)
self.confidences = ConfidenceBatch(confidences).or_with_descriptor_overrides(
archetype_name="user.CustomPoints3D", archetype_field_name="confidences"
)
def as_component_batches(self) -> list[rr.DescribedComponentBatch]:
return (
list(self.points3d.as_component_batches()) # The components from Points3D
+ [self.confidences] # Custom confidence data
)
def log_custom_data() -> None:
lin = np.linspace(-5, 5, 3)
z, y, x = np.meshgrid(lin, lin, lin, indexing="ij")
point_grid = np.vstack([x.flatten(), y.flatten(), z.flatten()]).T
rr.log(
"left/my_confident_point_cloud",
CustomPoints3D(
positions=point_grid,
confidences=[42],
),
)
rr.log(
"right/my_polarized_point_cloud",
CustomPoints3D(positions=point_grid, confidences=np.arange(0, len(point_grid))),
)
def main() -> None:
parser = argparse.ArgumentParser(description="Logs rich data using the Rerun SDK.")
rr.script_add_args(parser)
args = parser.parse_args()
rr.script_setup(args, "rerun_example_custom_data")
log_custom_data()
rr.script_teardown(args)
if __name__ == "__main__":
main()