From c16f22806ed1f6f51861b432e762a2c8a6c2b46d Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 12 Dec 2024 14:15:51 -0500 Subject: [PATCH 1/8] Add sorbet tags for indexes --- crates/store/re_chunk_store/src/dataframe.rs | 16 ++++++++++++++++ crates/store/re_log_types/src/time_point/mod.rs | 9 +++++++++ 2 files changed, 25 insertions(+) diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 2075d14f878e..755610cc22db 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -104,6 +104,21 @@ impl Ord for TimeColumnDescriptor { } impl TimeColumnDescriptor { + fn metadata(&self) -> arrow2::datatypes::Metadata { + let Self { + timeline, + datatype: _, + } = self; + + [ + Some(("sorbet.index_name".to_owned(), timeline.name().to_string())), + Some(("sorbet.index_type".to_owned(), timeline.typ().to_string())), + ] + .into_iter() + .flatten() + .collect() + } + #[inline] // Time column must be nullable since static data doesn't have a time. pub fn to_arrow_field(&self) -> Arrow2Field { @@ -113,6 +128,7 @@ impl TimeColumnDescriptor { datatype.clone(), true, /* nullable */ ) + .with_metadata(self.metadata()) } } diff --git a/crates/store/re_log_types/src/time_point/mod.rs b/crates/store/re_log_types/src/time_point/mod.rs index dbb5ea5bda21..4c820ea7e92d 100644 --- a/crates/store/re_log_types/src/time_point/mod.rs +++ b/crates/store/re_log_types/src/time_point/mod.rs @@ -118,6 +118,15 @@ pub enum TimeType { Sequence, } +impl std::fmt::Display for TimeType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Time => write!(f, "Time"), + Self::Sequence => write!(f, "Sequence"), + } + } +} + impl TimeType { #[inline] fn hash(&self) -> u64 { From 125236e3e1add79ee1df7bb0eecac0dde5eedf28 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 12 Dec 2024 15:34:03 -0500 Subject: [PATCH 2/8] New helper to send a QueryResult-structured dataframe back to Rerun --- rerun_py/rerun_sdk/rerun/any_value.py | 2 +- rerun_py/rerun_sdk/rerun/dataframe.py | 87 +++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/any_value.py b/rerun_py/rerun_sdk/rerun/any_value.py index c50318aab895..699441856f8c 100644 --- a/rerun_py/rerun_sdk/rerun/any_value.py +++ b/rerun_py/rerun_sdk/rerun/any_value.py @@ -8,7 +8,7 @@ from rerun._baseclasses import ComponentDescriptor -from . import ComponentColumn +from ._baseclasses import ComponentColumn from ._log import AsComponents, ComponentBatchLike from .error_utils import catch_and_log_exceptions diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py index 4b34e955a49f..5c2b80d0ed03 100644 --- a/rerun_py/rerun_sdk/rerun/dataframe.py +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -1,5 +1,9 @@ from __future__ import annotations +from collections import defaultdict +from typing import Optional + +import pyarrow as pa from rerun_bindings import ( ComponentColumnDescriptor as ComponentColumnDescriptor, ComponentColumnSelector as ComponentColumnSelector, @@ -18,3 +22,86 @@ ComponentLike as ComponentLike, ViewContentsLike as ViewContentsLike, ) + +from ._baseclasses import ComponentColumn, ComponentDescriptor +from ._log import IndicatorComponentBatch +from ._send_columns import TimeColumnLike, send_columns +from .recording_stream import RecordingStream + +SORBET_INDEX_NAME = b"sorbet.index_name" +SORBET_ENTITY_PATH = b"sorbet.path" +SORBET_ARCHETYPE_NAME = b"sorbet.semantic_family" +SORBET_ARCHETYPE_FIELD = b"sorbet.logical_type" +SORBET_COMPONENT_NAME = b"sorbet.semantic_type" + + +class RawIndexColumn(TimeColumnLike): + def __init__(self, metadata: dict, col: pa.Array): + self.metadata = metadata + self.col = col + + def timeline_name(self) -> str: + return self.metadata[SORBET_INDEX_NAME].decode("utf-8") + + def as_arrow_array(self) -> pa.Array: + return self.col + + +class RawComponentBatchLike(ComponentColumn): + def __init__(self, metadata: dict, col: pa.Array): + self.metadata = metadata + self.col = col + + def component_descriptor(self) -> ComponentDescriptor: + kwargs = {} + if SORBET_ARCHETYPE_NAME in self.metadata: + kwargs["archetype_name"] = "rerun.archetypes" + self.metadata[SORBET_ARCHETYPE_NAME].decode("utf-8") + if SORBET_COMPONENT_NAME in self.metadata: + kwargs["component_name"] = "rerun.components." + self.metadata[SORBET_COMPONENT_NAME].decode("utf-8") + if SORBET_ARCHETYPE_FIELD in self.metadata: + kwargs["archetype_field_name"] = self.metadata[SORBET_ARCHETYPE_FIELD].decode("utf-8") + + if "component_name" not in kwargs: + kwargs["component_name"] = "Unknown" + + return ComponentDescriptor(**kwargs) + + def as_arrow_array(self) -> pa.Array: + return self.col + + +def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = None): + """Coerce a single pyarrow `RecordBatch` to Rerun structure.""" + + indexes = [] + data = defaultdict(list) + archetypes = defaultdict(set) + for col in batch.schema: + metadata = col.metadata or {} + if SORBET_INDEX_NAME in metadata: + indexes.append(RawIndexColumn(metadata, batch.column(col.name))) + else: + entity_path = metadata.get(SORBET_ENTITY_PATH, b"/").decode("utf-8") + data[entity_path].append(RawComponentBatchLike(metadata, batch.column(col.name))) + if SORBET_ARCHETYPE_NAME in metadata: + archetypes[entity_path].add(metadata[SORBET_ARCHETYPE_NAME].decode("utf-8")) + for entity_path, archetypes in archetypes.items(): + for archetype in archetypes: + data[entity_path].append(IndicatorComponentBatch("rerun.archetypes." + archetype)) + + for entity_path, columns in data.items(): + send_columns( + entity_path, + indexes, + columns, + recording=rec, + ) + + +def send_dataframe(df: pa.RecordBatchReader | pa.Table, rec: Optional[RecordingStream] = None): + """Coerce a pyarrow `RecordBatchReader` or `Table` to Rerun structure.""" + if isinstance(df, pa.Table): + df = df.to_reader() + + for batch in df: + send_record_batch(batch, rec) From a2e19d280ab0cf8b5b1648a02412caa89bb22b0a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 12 Dec 2024 16:14:31 -0500 Subject: [PATCH 3/8] Default the path from the column name --- rerun_py/rerun_sdk/rerun/dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py index 5c2b80d0ed03..ee2612cd7199 100644 --- a/rerun_py/rerun_sdk/rerun/dataframe.py +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -81,7 +81,9 @@ def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = No if SORBET_INDEX_NAME in metadata: indexes.append(RawIndexColumn(metadata, batch.column(col.name))) else: - entity_path = metadata.get(SORBET_ENTITY_PATH, b"/").decode("utf-8") + entity_path = metadata.get(SORBET_ENTITY_PATH, col.name.split(":")[0]) + if isinstance(entity_path, bytes): + entity_path = entity_path.decode("utf-8") data[entity_path].append(RawComponentBatchLike(metadata, batch.column(col.name))) if SORBET_ARCHETYPE_NAME in metadata: archetypes[entity_path].add(metadata[SORBET_ARCHETYPE_NAME].decode("utf-8")) From 66eb44c59ae425e5dc2e7c65c503a3d2369de2d1 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 13 Dec 2024 13:47:15 -0500 Subject: [PATCH 4/8] Also support RerunChunk-style indexes --- rerun_py/rerun_sdk/rerun/dataframe.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py index ee2612cd7199..7aa2ec3f12e3 100644 --- a/rerun_py/rerun_sdk/rerun/dataframe.py +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -33,6 +33,9 @@ SORBET_ARCHETYPE_NAME = b"sorbet.semantic_family" SORBET_ARCHETYPE_FIELD = b"sorbet.logical_type" SORBET_COMPONENT_NAME = b"sorbet.semantic_type" +RERUN_KIND = b"rerun.kind" +RERUN_KIND_CONTROL = b"control" +RERUN_KIND_INDEX = b"time" class RawIndexColumn(TimeColumnLike): @@ -41,7 +44,10 @@ def __init__(self, metadata: dict, col: pa.Array): self.col = col def timeline_name(self) -> str: - return self.metadata[SORBET_INDEX_NAME].decode("utf-8") + name = self.metadata.get(SORBET_INDEX_NAME, "unknown") + if isinstance(name, bytes): + name = name.decode("utf-8") + return name def as_arrow_array(self) -> pa.Array: return self.col @@ -78,7 +84,11 @@ def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = No archetypes = defaultdict(set) for col in batch.schema: metadata = col.metadata or {} - if SORBET_INDEX_NAME in metadata: + if metadata.get(RERUN_KIND) == RERUN_KIND_CONTROL: + continue + if SORBET_INDEX_NAME in metadata or metadata.get(RERUN_KIND) == RERUN_KIND_INDEX: + if SORBET_INDEX_NAME not in metadata: + metadata[SORBET_INDEX_NAME] = col.name indexes.append(RawIndexColumn(metadata, batch.column(col.name))) else: entity_path = metadata.get(SORBET_ENTITY_PATH, col.name.split(":")[0]) From 8facab4bd1aee4466c174fa32e6dd89534e741fa Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 13 Dec 2024 13:51:46 -0500 Subject: [PATCH 5/8] Recording lint --- rerun_py/rerun_sdk/rerun/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py index 7aa2ec3f12e3..2c715d68da63 100644 --- a/rerun_py/rerun_sdk/rerun/dataframe.py +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -106,7 +106,8 @@ def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = No entity_path, indexes, columns, - recording=rec, + # This is fine, send_columns will handle the conversion + recording=rec, # NOLINT ) From 9decb0eb5c7a279c315433dd7096c24f53254612 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 13 Dec 2024 13:54:18 -0500 Subject: [PATCH 6/8] Lint --- rerun_py/rerun_sdk/rerun/dataframe.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rerun_py/rerun_sdk/rerun/dataframe.py b/rerun_py/rerun_sdk/rerun/dataframe.py index 2c715d68da63..b525bf1e9e81 100644 --- a/rerun_py/rerun_sdk/rerun/dataframe.py +++ b/rerun_py/rerun_sdk/rerun/dataframe.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import defaultdict -from typing import Optional +from typing import Any, Optional import pyarrow as pa from rerun_bindings import ( @@ -39,7 +39,7 @@ class RawIndexColumn(TimeColumnLike): - def __init__(self, metadata: dict, col: pa.Array): + def __init__(self, metadata: dict[bytes, bytes], col: pa.Array): self.metadata = metadata self.col = col @@ -54,7 +54,7 @@ def as_arrow_array(self) -> pa.Array: class RawComponentBatchLike(ComponentColumn): - def __init__(self, metadata: dict, col: pa.Array): + def __init__(self, metadata: dict[bytes, bytes], col: pa.Array): self.metadata = metadata self.col = col @@ -76,12 +76,12 @@ def as_arrow_array(self) -> pa.Array: return self.col -def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = None): +def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = None) -> None: """Coerce a single pyarrow `RecordBatch` to Rerun structure.""" indexes = [] - data = defaultdict(list) - archetypes = defaultdict(set) + data: defaultdict[str, list[Any]] = defaultdict(list) + archetypes: defaultdict[str, set[Any]] = defaultdict(set) for col in batch.schema: metadata = col.metadata or {} if metadata.get(RERUN_KIND) == RERUN_KIND_CONTROL: @@ -97,8 +97,8 @@ def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = No data[entity_path].append(RawComponentBatchLike(metadata, batch.column(col.name))) if SORBET_ARCHETYPE_NAME in metadata: archetypes[entity_path].add(metadata[SORBET_ARCHETYPE_NAME].decode("utf-8")) - for entity_path, archetypes in archetypes.items(): - for archetype in archetypes: + for entity_path, archetype_set in archetypes.items(): + for archetype in archetype_set: data[entity_path].append(IndicatorComponentBatch("rerun.archetypes." + archetype)) for entity_path, columns in data.items(): @@ -111,7 +111,7 @@ def send_record_batch(batch: pa.RecordBatch, rec: Optional[RecordingStream] = No ) -def send_dataframe(df: pa.RecordBatchReader | pa.Table, rec: Optional[RecordingStream] = None): +def send_dataframe(df: pa.RecordBatchReader | pa.Table, rec: Optional[RecordingStream] = None) -> None: """Coerce a pyarrow `RecordBatchReader` or `Table` to Rerun structure.""" if isinstance(df, pa.Table): df = df.to_reader() From 8b08950172c77046e4e247992647a563c3acaad7 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Mon, 16 Dec 2024 11:53:10 -0500 Subject: [PATCH 7/8] Don't need to track the type of timeline as metadata --- crates/store/re_chunk_store/src/dataframe.rs | 9 ++++----- crates/store/re_log_types/src/time_point/mod.rs | 9 --------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 755610cc22db..283bf10313f8 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -110,11 +110,10 @@ impl TimeColumnDescriptor { datatype: _, } = self; - [ - Some(("sorbet.index_name".to_owned(), timeline.name().to_string())), - Some(("sorbet.index_type".to_owned(), timeline.typ().to_string())), - ] - .into_iter() + std::iter::once(Some(( + "sorbet.index_name".to_owned(), + timeline.name().to_string(), + ))) .flatten() .collect() } diff --git a/crates/store/re_log_types/src/time_point/mod.rs b/crates/store/re_log_types/src/time_point/mod.rs index 4c820ea7e92d..dbb5ea5bda21 100644 --- a/crates/store/re_log_types/src/time_point/mod.rs +++ b/crates/store/re_log_types/src/time_point/mod.rs @@ -118,15 +118,6 @@ pub enum TimeType { Sequence, } -impl std::fmt::Display for TimeType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Time => write!(f, "Time"), - Self::Sequence => write!(f, "Sequence"), - } - } -} - impl TimeType { #[inline] fn hash(&self) -> u64 { From 53da877de0e74aee7110b634f631bd76037019ca Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Mon, 16 Dec 2024 11:53:25 -0500 Subject: [PATCH 8/8] Add a basic unit test --- rerun_py/tests/unit/test_dataframe.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rerun_py/tests/unit/test_dataframe.py b/rerun_py/tests/unit/test_dataframe.py index 69daa71d1da3..067f7fa63b34 100644 --- a/rerun_py/tests/unit/test_dataframe.py +++ b/rerun_py/tests/unit/test_dataframe.py @@ -380,3 +380,19 @@ def test_view_syntax(self) -> None: table = pa.Table.from_batches(batches, batches.schema) assert table.num_columns == 3 assert table.num_rows == 0 + + def test_roundtrip_send(self) -> None: + df = self.recording.view(index="my_index", contents="/**").select().read_all() + + with tempfile.TemporaryDirectory() as tmpdir: + rrd = tmpdir + "/tmp.rrd" + + rr.init("rerun_example_test_recording") + rr.dataframe.send_dataframe(df) + rr.save(rrd) + + round_trip_recording = rr.dataframe.load_recording(rrd) + + df_round_trip = round_trip_recording.view(index="my_index", contents="/**").select().read_all() + + assert df == df_round_trip