Skip to content

Commit

Permalink
New data APIs 11: port all range-only views (plots, logs…) (#5992)
Browse files Browse the repository at this point in the history
Text logs, line plots and scatter plots.

A bit faster than `main`, with a bit less memory overhead.

---

Part of a PR series to completely revamp the data APIs in preparation
for the removal of instance keys and the introduction of promises:
- #5573
- #5574
- #5581
- #5605
- #5606
- #5633
- #5673
- #5679
- #5687
- #5755
- #5990
- #5992
- #5993 
- #5994
- #6035
- #6036
- #6037

Builds on top of the static data PR series:
- #5534
  • Loading branch information
teh-cmc authored Apr 26, 2024
1 parent 1cd97aa commit d0e9af0
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 190 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_space_view_text_log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ re_entity_db.workspace = true
re_log_types.workspace = true
re_log.workspace = true
re_query_cache.workspace = true
re_query_cache2.workspace = true
re_renderer.workspace = true
re_tracing.workspace = true
re_types.workspace = true
Expand Down
108 changes: 78 additions & 30 deletions crates/re_space_view_text_log/src/visualizer_system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use re_data_store::TimeRange;
use re_entity_db::EntityPath;
use re_log_types::{RowId, TimeInt};
use re_query_cache2::{clamped_zip_1x2, range_zip_1x2, CachedRangeData, PromiseResult};
use re_types::{
archetypes::TextLog,
components::{Color, Text, TextLogLevel},
Component, Loggable as _,
};
use re_viewer_context::{
IdentifiedViewSystem, SpaceViewSystemExecutionError, ViewContextCollection, ViewQuery,
Expand Down Expand Up @@ -46,45 +48,71 @@ impl VisualizerSystem for TextLogSystem {
fn execute(
&mut self,
ctx: &ViewerContext<'_>,
query: &ViewQuery<'_>,
view_query: &ViewQuery<'_>,
_view_ctx: &ViewContextCollection,
) -> Result<Vec<re_renderer::QueueableDrawData>, SpaceViewSystemExecutionError> {
let query_caches = ctx.recording().query_caches();
let store = ctx.recording_store();
let resolver = ctx.recording().resolver();
let query = re_data_store::RangeQuery::new(view_query.timeline, TimeRange::EVERYTHING);

for data_result in query.iter_visible_data_results(ctx, Self::identifier()) {
for data_result in view_query.iter_visible_data_results(ctx, Self::identifier()) {
re_tracing::profile_scope!("primary", &data_result.entity_path.to_string());

// We want everything, for all times:
let timeline_query =
re_data_store::RangeQuery::new(query.timeline, TimeRange::EVERYTHING);

// TODO(cmc): use raw API.
query_caches.query_archetype_pov1_comp2::<TextLog, Text, TextLogLevel, Color, _>(
store,
&timeline_query.clone().into(),
let results = ctx.recording().query_caches2().range(
ctx.recording_store(),
&query,
&data_result.entity_path,
|((time, row_id), _, bodies, levels, colors)| {
for (body, level, color) in itertools::izip!(
bodies.iter(),
re_query_cache::iter_or_repeat_opt(levels, bodies.len()),
re_query_cache::iter_or_repeat_opt(colors, bodies.len()),
) {
self.entries.push(Entry {
row_id,
entity_path: data_result.entity_path.clone(),
time,
color: *color,
body: body.clone(),
level: level.clone(),
});
}
},
)?;
[Text::name(), TextLogLevel::name(), Color::name()],
);

let all_bodies = {
let Some(all_bodies) = results.get(Text::name()) else {
continue;
};
all_bodies.to_dense::<Text>(resolver)
};
check_range(&all_bodies)?;

let all_levels = results
.get_or_empty(TextLogLevel::name())
.to_dense::<TextLogLevel>(resolver);
check_range(&all_levels)?;

let all_colors = results
.get_or_empty(Color::name())
.to_dense::<Color>(resolver);
check_range(&all_colors)?;

let all_frames = range_zip_1x2(
all_bodies.range_indexed(),
all_levels.range_indexed(),
all_colors.range_indexed(),
);

for (&(data_time, row_id), bodies, levels, colors) in all_frames {
let levels = levels.unwrap_or(&[]).iter().cloned().map(Some);
let colors = colors.unwrap_or(&[]).iter().copied().map(Some);

let level_default_fn = || None;
let color_default_fn = || None;

let results =
clamped_zip_1x2(bodies, levels, level_default_fn, colors, color_default_fn);

for (body, level, color) in results {
self.entries.push(Entry {
row_id,
entity_path: data_result.entity_path.clone(),
time: data_time,
color,
body: body.clone(),
level,
});
}
}
}

{
// Sort by currently selected tiemeline
// Sort by currently selected timeline
re_tracing::profile_scope!("sort");
self.entries.sort_by_key(|e| e.time);
}
Expand All @@ -96,3 +124,23 @@ impl VisualizerSystem for TextLogSystem {
self
}
}

// TODO(#5607): what should happen if the promise is still pending?
#[inline]
fn check_range<'a, C: Component>(
results: &'a CachedRangeData<'a, C>,
) -> re_query_cache2::Result<()> {
let (front_status, back_status) = results.status();
match front_status {
PromiseResult::Pending => return Ok(()),
PromiseResult::Error(err) => return Err(re_query_cache2::QueryError::Other(err.into())),
PromiseResult::Ready(_) => {}
}
match back_status {
PromiseResult::Pending => return Ok(()),
PromiseResult::Error(err) => return Err(re_query_cache2::QueryError::Other(err.into())),
PromiseResult::Ready(_) => {}
}

Ok(())
}
1 change: 1 addition & 0 deletions crates/re_space_view_time_series/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ re_log.workspace = true
re_log_types.workspace = true
re_query.workspace = true
re_query_cache.workspace = true
re_query_cache2.workspace = true
re_renderer.workspace = true
re_space_view.workspace = true
re_tracing.workspace = true
Expand Down
182 changes: 110 additions & 72 deletions crates/re_space_view_time_series/src/line_visualizer_system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_query_cache::QueryError;
use re_query_cache2::{PromiseResult, QueryError};
use re_types::archetypes;
use re_types::{
archetypes::SeriesLine,
components::{Color, MarkerShape, MarkerSize, Name, Scalar, StrokeWidth},
components::{Color, Name, Scalar, StrokeWidth},
Archetype as _, ComponentNameSet, Loggable,
};
use re_viewer_context::{
Expand Down Expand Up @@ -155,8 +155,7 @@ fn load_series(
) -> Result<(), QueryError> {
re_tracing::profile_function!();

let store = ctx.recording_store();
let query_caches = ctx.recording().query_caches();
let resolver = ctx.recording().resolver();

let annotation_info = annotations
.resolved_class_description(None)
Expand All @@ -181,7 +180,7 @@ fn load_series(
},
};

let mut points = Vec::new();
let mut points;

let time_range = determine_time_range(
ctx,
Expand All @@ -196,83 +195,122 @@ fn load_series(
let entity_path = &data_result.entity_path;
let query = re_data_store::RangeQuery::new(query.timeline, time_range);

// TODO(jleibs): need to do a "joined" archetype query
// The `Scalar` archetype queries for `MarkerShape` & `MarkerSize` in the point visualizer,
// and so it must do so here also.
// See https://github.com/rerun-io/rerun/pull/5029
query_caches.query_archetype_range_pov1_comp4::<
archetypes::Scalar,
Scalar,
Color,
StrokeWidth,
MarkerSize, // unused
MarkerShape, // unused
_,
>(
store,
let results = ctx.recording().query_caches2().range(
ctx.recording_store(),
&query,
entity_path,
|entry_range, (times, _, scalars, colors, stroke_widths, _, _)| {
let times = times.range(entry_range.clone()).map(|(time, _)| time.as_i64());
// Allocate all points.
points = times.map(|time| PlotPoint {
time,
..default_point.clone()
}).collect_vec();

// Fill in values.
for (i, scalar) in scalars.range(entry_range.clone()).enumerate() {
if scalar.len() > 1 {
re_log::warn_once!("found a scalar batch in {entity_path:?} -- those have no effect");
} else if scalar.is_empty() {
points[i].attrs.kind = PlotSeriesKind::Clear;
} else {
points[i].value = scalar.first().map_or(0.0, |s| s.0);
}
[Scalar::name(), Color::name(), StrokeWidth::name()],
);

let all_scalars = results
.get_required(Scalar::name())?
.to_dense::<Scalar>(resolver);
let all_scalars_entry_range = all_scalars.entry_range();

if !matches!(
all_scalars.status(),
(PromiseResult::Ready(()), PromiseResult::Ready(()))
) {
// TODO(#5607): what should happen if the promise is still pending?
}

// Allocate all points.
points = all_scalars
.range_indices(all_scalars_entry_range.clone())
.map(|(data_time, _)| PlotPoint {
time: data_time.as_i64(),
..default_point.clone()
})
.collect_vec();

// Fill in values.
for (i, scalars) in all_scalars
.range_data(all_scalars_entry_range.clone())
.enumerate()
{
if scalars.len() > 1 {
re_log::warn_once!(
"found a scalar batch in {entity_path:?} -- those have no effect"
);
} else if scalars.is_empty() {
points[i].attrs.kind = PlotSeriesKind::Clear;
} else {
points[i].value = scalars.first().map_or(0.0, |s| s.0);
}
}

// Make it as clear as possible to the optimizer that some parameters
// go completely unused as soon as overrides have been defined.

// Fill in colors -- if available _and_ not overridden.
if override_color.is_none() {
if let Some(all_colors) = results.get(Color::name()) {
let all_colors = all_colors.to_dense::<Color>(resolver);

if !matches!(
all_colors.status(),
(PromiseResult::Ready(()), PromiseResult::Ready(()))
) {
// TODO(#5607): what should happen if the promise is still pending?
}

// Make it as clear as possible to the optimizer that some parameters
// go completely unused as soon as overrides have been defined.
let all_scalars_indexed = all_scalars
.range_indices(all_scalars_entry_range.clone())
.map(|index| (index, ()));

// Fill in colors -- if available _and_ not overridden.
if override_color.is_none() {
if let Some(colors) = colors {
for (i, color) in colors.range(entry_range.clone()).enumerate() {
if i >= points.len() {
re_log::debug_once!("more color attributes than points in {entity_path:?} -- this points to a bug in the query cache");
break;
}
if let Some(color) = color.first().copied().flatten().map(|c| {
let [r,g,b,a] = c.to_array();
if a == 255 {
// Common-case optimization
re_renderer::Color32::from_rgb(r, g, b)
} else {
re_renderer::Color32::from_rgba_unmultiplied(r, g, b, a)
}
}) {
points[i].attrs.color = color;
let all_frames =
re_query_cache2::range_zip_1x1(all_scalars_indexed, all_colors.range_indexed())
.enumerate();

for (i, (_index, _scalars, colors)) in all_frames {
if let Some(color) = colors.and_then(|colors| {
colors.first().map(|c| {
let [r, g, b, a] = c.to_array();
if a == 255 {
// Common-case optimization
re_renderer::Color32::from_rgb(r, g, b)
} else {
re_renderer::Color32::from_rgba_unmultiplied(r, g, b, a)
}
}
})
}) {
points[i].attrs.color = color;
}
}
}
}

// Fill in radii -- if available _and_ not overridden.
if override_stroke_width.is_none() {
if let Some(stroke_widths) = stroke_widths {
for (i, stroke_width) in stroke_widths.range(entry_range.clone()).enumerate() {
if i >= stroke_widths.num_entries() {
re_log::debug_once!("more stroke width attributes than points in {entity_path:?} -- this points to a bug in the query cache");
break;
}
if let Some(stroke_width) = stroke_width.first().copied().flatten().map(|r| r.0) {
points[i].attrs.marker_size = stroke_width;
}
}
// Fill in stroke widths -- if available _and_ not overridden.
if override_stroke_width.is_none() {
if let Some(all_stroke_widths) = results.get(StrokeWidth::name()) {
let all_stroke_widths = all_stroke_widths.to_dense::<StrokeWidth>(resolver);

if !matches!(
all_stroke_widths.status(),
(PromiseResult::Ready(()), PromiseResult::Ready(()))
) {
// TODO(#5607): what should happen if the promise is still pending?
}

let all_scalars_indexed = all_scalars
.range_indices(all_scalars_entry_range.clone())
.map(|index| (index, ()));

let all_frames = re_query_cache2::range_zip_1x1(
all_scalars_indexed,
all_stroke_widths.range_indexed(),
)
.enumerate();

for (i, (_index, _scalars, stroke_widths)) in all_frames {
if let Some(stroke_width) =
stroke_widths.and_then(|stroke_widths| stroke_widths.first().map(|r| r.0))
{
points[i].attrs.marker_size = stroke_width;
}
}
},
)?;
}
}
}

// Check for an explicit label if any.
Expand All @@ -293,7 +331,7 @@ fn load_series(
data_result,
time_per_pixel,
points,
store,
ctx.recording_store(),
query,
series_name,
all_series,
Expand Down
Loading

0 comments on commit d0e9af0

Please sign in to comment.