Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rr.log_file_from_path now defaults to the active app/recording ID #7864

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 63 additions & 7 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use re_log_encoding::decoder::Decoder;

#[cfg(not(target_arch = "wasm32"))]
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

// ---

Expand All @@ -17,8 +18,7 @@ impl crate::DataLoader for RrdLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand Down Expand Up @@ -57,12 +57,21 @@ impl crate::DataLoader for RrdLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let settings = settings.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
// We never want to patch blueprints' store IDs, only their app IDs.
None,
);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
}

"rrd" => {
// For .rrd files we retry reading despite reaching EOF to support live (writer) streaming.
// Decoder will give up when it sees end of file marker (i.e. end-of-stream message header)
Expand All @@ -76,8 +85,15 @@ impl crate::DataLoader for RrdLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let settings = settings.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
Expand All @@ -90,8 +106,7 @@ impl crate::DataLoader for RrdLoader {

fn load_from_file_contents(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand All @@ -116,7 +131,13 @@ impl crate::DataLoader for RrdLoader {
},
};

decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
);

Ok(())
}
Expand All @@ -126,6 +147,8 @@ fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
decoder: Decoder<R>,
forced_application_id: Option<&ApplicationId>,
forced_store_id: Option<&StoreId>,
) {
re_tracing::profile_function!(filepath.display().to_string());

Expand All @@ -137,6 +160,39 @@ fn decode_and_stream<R: std::io::Read>(
continue;
}
};

let msg = if forced_application_id.is_some() || forced_store_id.is_some() {
match msg {
re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
info: re_log_types::StoreInfo {
application_id: forced_application_id
.cloned()
.unwrap_or(set_store_info.info.application_id),
store_id: forced_store_id
.cloned()
.unwrap_or(set_store_info.info.store_id),
..set_store_info.info
},
..set_store_info
})
}

re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
re_log_types::LogMsg::ArrowMsg(
forced_store_id.cloned().unwrap_or(store_id),
arrow_msg,
)
}

re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command)
}
}
} else {
msg
};

if tx.send(msg.into()).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
19 changes: 14 additions & 5 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ impl RecordingStream {
entity_path_prefix: Option<EntityPath>,
static_: bool,
) -> RecordingStreamResult<()> {
self.log_file(filepath, None, entity_path_prefix, static_)
self.log_file(filepath, None, entity_path_prefix, static_, true)
}

/// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
Expand All @@ -1138,16 +1138,20 @@ impl RecordingStream {
entity_path_prefix: Option<EntityPath>,
static_: bool,
) -> RecordingStreamResult<()> {
self.log_file(filepath, Some(contents), entity_path_prefix, static_)
self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
}

/// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
/// will be configured as if the current SDK recording is the currently opened recording.
/// Most dataloaders prefer logging to the currently opened recording if one is set.
#[cfg(feature = "data_loaders")]
fn log_file(
&self,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
entity_path_prefix: Option<EntityPath>,
static_: bool,
prefer_current_recording: bool,
) -> RecordingStreamResult<()> {
let Some(store_info) = self.store_info().clone() else {
re_log::warn!("Ignored call to log_file() because RecordingStream has not been properly initialized");
Expand All @@ -1162,10 +1166,10 @@ impl RecordingStream {
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let settings = crate::DataLoaderSettings {
let mut settings = crate::DataLoaderSettings {
application_id: Some(store_info.application_id.clone()),
opened_application_id: None,
store_id: store_info.store_id,
store_id: store_info.store_id.clone(),
opened_store_id: None,
entity_path_prefix,
timepoint: (!static_).then(|| {
Expand All @@ -1183,9 +1187,14 @@ impl RecordingStream {
now
})
.unwrap_or_default()
}), // timepoint: self.time,
}),
};

if prefer_current_recording {
settings.opened_application_id = Some(store_info.application_id.clone());
settings.opened_store_id = Some(store_info.store_id);
}

if let Some(contents) = contents {
re_data_loader::load_from_file_contents(
&settings,
Expand Down
Loading