Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK DataLoaders 6: customizable (external) loaders for Python #5355

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DataLoaderSettings {
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the entity paths be prefixed with?
/// What should the logged entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

/// At what time(s) should the data be logged to?
Expand Down
47 changes: 45 additions & 2 deletions examples/python/external_data_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# It is up to you whether you make use of that shared recording ID or not.
# If you use it, the data will end up in the same recording as all other plugins interested in
# that file, otherwise you can just create a dedicated recording for it. Or both.
#
# Check out `re_data_source::DataLoaderSettings` documentation for an exhaustive listing of
# the available CLI parameters.
parser = argparse.ArgumentParser(
description="""
This is an example executable data-loader plugin for the Rerun Viewer.
Expand All @@ -28,7 +31,23 @@
"""
)
parser.add_argument("filepath", type=str)
parser.add_argument("--recording-id", type=str)
parser.add_argument("--recording-id", type=str, help="optional recommended ID for the recording")
parser.add_argument("--entity-path-prefix", type=str, help="optional prefix for all entity paths")
parser.add_argument(
"--timeless", action="store_true", default=False, help="optionally mark data to be logged as timeless"
)
parser.add_argument(
"--time",
type=str,
action="append",
help="optional timestamps to log at (e.g. `--time sim_time=1709203426`)",
)
parser.add_argument(
"--sequence",
type=str,
action="append",
help="optional sequences to log at (e.g. `--sequence sim_frame=42`)",
)
args = parser.parse_args()


Expand All @@ -44,10 +63,34 @@ def main() -> None:
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
rr.stdout()

set_time_from_args()

if args.entity_path_prefix:
entity_path = f"{args.entity_path_prefix}/{args.filepath}"
else:
entity_path = args.filepath

with open(args.filepath) as file:
body = file.read()
text = f"""## Some Python code\n```python\n{body}\n```\n"""
rr.log(args.filepath, rr.TextDocument(text, media_type=rr.MediaType.MARKDOWN), timeless=True)
rr.log(entity_path, rr.TextDocument(text, media_type=rr.MediaType.MARKDOWN), timeless=args.timeless)


def set_time_from_args() -> None:
if not args.timeless and args.time is not None:
for time_str in args.time:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_seconds(timeline_name, float(time))

for time_str in args.time:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_sequence(timeline_name, int(time))


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions examples/python/log_file/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
for filepath in args.filepaths:
if not args.from_contents:
# Either log the file using its path…
rr.log_file_from_path(filepath)
rr.log_file_from_path(filepath, entity_path_prefix="log_file_example")
else:
# …or using its contents if you already have them loaded for some reason.
try:
with open(filepath, "rb") as file:
rr.log_file_from_contents(filepath, file.read())
rr.log_file_from_contents(filepath, file.read(), entity_path_prefix="log_file_example")
except Exception:
pass

Expand Down
2 changes: 1 addition & 1 deletion examples/rust/external_data_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct Args {
#[argh(option)]
entity_path_prefix: Option<String>,

/// optional mark data to be logged as timeless
/// optionally mark data to be logged as timeless
#[argh(switch)]
timeless: bool,

Expand Down
43 changes: 41 additions & 2 deletions rerun_py/rerun_sdk/rerun/_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,14 @@ def log_components(
)


# TODO(#3841): expose timepoint settings once we implement stateless APIs
@catch_and_log_exceptions()
def log_file_from_path(
file_path: str | Path,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -304,21 +308,40 @@ def log_file_from_path(
file_path:
Path to the file to be logged.

recording_id:
The recommended `RecordingId` to log the data to.

entity_path_prefix:
What should the logged entity paths be prefixed with?

timeless:
Should the logged data be timeless?

recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
defaults to the current active data recording, if there is one. See
also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

"""

bindings.log_file_from_path(Path(file_path), recording=recording)
bindings.log_file_from_path(
Path(file_path),
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
)


# TODO(cmc): expose timepoint settings once we implement stateless APIs
@catch_and_log_exceptions()
def log_file_from_contents(
file_path: str | Path,
file_contents: bytes,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -339,14 +362,30 @@ def log_file_from_contents(
file_contents:
Contents to be logged.

recording_id:
The recommended `RecordingId` to log the data to.

entity_path_prefix:
What should the logged entity paths be prefixed with?

timeless:
Should the logged data be timeless?

recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
defaults to the current active data recording, if there is one. See
also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

"""

bindings.log_file_from_contents(Path(file_path), file_contents, recording=recording)
bindings.log_file_from_contents(
Path(file_path),
file_contents,
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
)


def escape_entity_path_part(part: str) -> str:
Expand Down
87 changes: 62 additions & 25 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,59 +967,96 @@ fn log_arrow_msg(
#[pyfunction]
#[pyo3(signature = (
file_path,
recording_id = None,
entity_path_prefix = None,
timeless = None,
recording=None,
))]
fn log_file_from_path(
py: Python<'_>,
file_path: std::path::PathBuf,
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};

let Some(recording_id) = recording.store_info().map(|info| info.store_id.clone()) else {
return Ok(());
};
let settings = rerun::DataLoaderSettings::recommended(recording_id);

recording
.log_file_from_path(&settings, file_path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

py.allow_threads(flush_garbage_queue);

Ok(())
log_file(
py,
file_path,
None,
recording_id,
entity_path_prefix,
timeless,
recording,
)
}

#[pyfunction]
#[pyo3(signature = (
file_path,
file_contents,
recording_id = None,
entity_path_prefix = None,
timeless = None,
recording=None,
))]
fn log_file_from_contents(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: &[u8],
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
log_file(
py,
file_path,
Some(file_contents),
recording_id,
entity_path_prefix,
timeless,
recording,
)
}

fn log_file(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: Option<&[u8]>,
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};

let Some(recording_id) = recording.store_info().map(|info| info.store_id.clone()) else {
let Some(recording_id) = recording
.store_info()
.map(|info| info.store_id.clone())
.or(recording_id.map(|id| StoreId::from_string(StoreKind::Recording, id)))
else {
return Ok(());
};
let settings = rerun::DataLoaderSettings::recommended(recording_id);

recording
.log_file_from_contents(
&settings,
file_path,
std::borrow::Cow::Borrowed(file_contents),
)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let settings = rerun::DataLoaderSettings {
store_id: recording_id,
opened_store_id: None,
entity_path_prefix: entity_path_prefix.map(Into::into),
timepoint: timeless.unwrap_or(false).then(TimePoint::timeless),
};

if let Some(contents) = file_contents {
recording
.log_file_from_contents(&settings, file_path, std::borrow::Cow::Borrowed(contents))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
} else {
recording
.log_file_from_path(&settings, file_path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
}

py.allow_threads(flush_garbage_queue);

Expand Down
Loading