Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 90 additions & 29 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ pub struct BatcherHooks {
#[allow(clippy::type_complexity)]
pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,

/// Called when the batcher's configuration changes.
///
/// Called for initial configuration as well as subsequent changes.
/// Used for testing.
#[allow(clippy::type_complexity)]
pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,

/// Callback to be run when an Arrow Chunk goes out of scope.
///
/// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
Expand All @@ -62,6 +69,7 @@ pub struct BatcherHooks {
impl BatcherHooks {
pub const NONE: Self = Self {
on_insert: None,
on_config_change: None,
on_release: None,
};
}
Expand All @@ -70,6 +78,7 @@ impl PartialEq for BatcherHooks {
fn eq(&self, other: &Self) -> bool {
let Self {
on_insert,
on_config_change,
on_release,
} = self;

Expand All @@ -79,18 +88,26 @@ impl PartialEq for BatcherHooks {
_ => false,
};

on_insert_eq && on_release == &other.on_release
let on_config_change_eq = match (on_config_change, &other.on_config_change) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};

on_insert_eq && on_config_change_eq && on_release == &other.on_release
}
}

impl std::fmt::Debug for BatcherHooks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
on_insert,
on_config_change,
on_release,
} = self;
f.debug_struct("BatcherHooks")
.field("on_insert", &on_insert.as_ref().map(|_| "…"))
.field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
.field("on_release", &on_release)
.finish()
}
Expand All @@ -101,7 +118,7 @@ impl std::fmt::Debug for BatcherHooks {
/// Defines the different thresholds of the associated [`ChunkBatcher`].
///
/// See [`Self::default`] and [`Self::from_env`].
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChunkBatcherConfig {
/// Duration of the periodic tick.
//
Expand All @@ -126,15 +143,14 @@ pub struct ChunkBatcherConfig {
/// Size of the internal channel of commands.
///
/// Unbounded if left unspecified.
/// Once a batcher is created, this property cannot be changed.
pub max_commands_in_flight: Option<u64>,

/// Size of the internal channel of [`Chunk`]s.
///
/// Unbounded if left unspecified.
/// Once a batcher is created, this property cannot be changed.
pub max_chunks_in_flight: Option<u64>,

/// Callbacks you can install on the [`ChunkBatcher`].
pub hooks: BatcherHooks,
}

impl Default for ChunkBatcherConfig {
Expand All @@ -146,13 +162,18 @@ impl Default for ChunkBatcherConfig {
impl ChunkBatcherConfig {
/// Default configuration, applicable to most use cases.
pub const DEFAULT: Self = Self {
flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel
flush_num_bytes: 1024 * 1024, // 1 MiB
flush_tick: Duration::from_millis(200),
flush_num_bytes: 1024 * 1024, // 1 MiB
flush_num_rows: u64::MAX,
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};

/// Low-latency configuration, preferred when streaming directly to a viewer.
pub const LOW_LATENCY: Self = Self {
flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel
..Self::DEFAULT
};

/// Always flushes ASAP.
Expand All @@ -163,7 +184,6 @@ impl ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};

/// Never flushes unless manually told to (or hitting one the builtin invariants).
Expand All @@ -174,7 +194,6 @@ impl ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 256,
max_commands_in_flight: None,
max_chunks_in_flight: None,
hooks: BatcherHooks::NONE,
};

/// Environment variable to configure [`Self::flush_tick`].
Expand Down Expand Up @@ -384,6 +403,7 @@ enum Command {
AppendChunk(Chunk),
AppendRow(EntityPath, PendingRow),
Flush(Sender<()>),
UpdateConfig(ChunkBatcherConfig),
Shutdown,
}

Expand All @@ -401,7 +421,7 @@ impl ChunkBatcher {
/// batcher.
#[must_use = "Batching threads will automatically shutdown when this object is dropped"]
#[allow(clippy::needless_pass_by_value)]
pub fn new(config: ChunkBatcherConfig) -> ChunkBatcherResult<Self> {
pub fn new(config: ChunkBatcherConfig, hooks: BatcherHooks) -> ChunkBatcherResult<Self> {
let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
Some(cap) => crossbeam::channel::bounded(cap as _),
None => crossbeam::channel::unbounded(),
Expand All @@ -418,7 +438,7 @@ impl ChunkBatcher {
.name(NAME.into())
.spawn({
let config = config.clone();
move || batching_thread(config, rx_cmd, tx_chunk)
move || batching_thread(config, hooks, rx_cmd, tx_chunk)
})
.map_err(|err| ChunkBatcherError::SpawnThread {
name: NAME,
Expand Down Expand Up @@ -472,6 +492,11 @@ impl ChunkBatcher {
self.inner.flush_blocking();
}

/// Updates the batcher's configuration as far as possible.
pub fn update_config(&self, config: ChunkBatcherConfig) {
self.inner.update_config(config);
}

// --- Subscribe to chunks ---

/// Returns a _shared_ channel in which are sent the batched [`Chunk`]s.
Expand Down Expand Up @@ -507,6 +532,10 @@ impl ChunkBatcherInner {
oneshot.recv().ok();
}

fn update_config(&self, config: ChunkBatcherConfig) {
self.send_cmd(Command::UpdateConfig(config));
}

fn send_cmd(&self, cmd: Command) {
// NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot
// fail.
Expand All @@ -515,8 +544,13 @@ impl ChunkBatcherInner {
}

#[allow(clippy::needless_pass_by_value)]
fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chunk: Sender<Chunk>) {
let rx_tick = crossbeam::channel::tick(config.flush_tick);
fn batching_thread(
mut config: ChunkBatcherConfig,
hooks: BatcherHooks,
rx_cmd: Receiver<Command>,
tx_chunk: Sender<Chunk>,
) {
let mut rx_tick = crossbeam::channel::tick(config.flush_tick);

struct Accumulator {
latest: Instant,
Expand Down Expand Up @@ -600,6 +634,10 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu
config.flush_num_rows,
re_format::format_bytes(config.flush_num_bytes as _),
);
// Signal initial config
if let Some(on_config_change) = hooks.on_config_change.as_ref() {
on_config_change(&config);
}

// Set to `true` when a flush is triggered for a reason other than hitting the time threshold,
// so that the next tick will not unnecessarily fire early.
Expand Down Expand Up @@ -636,7 +674,7 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu
.or_insert_with(|| Accumulator::new(entity_path));
do_push_row(acc, row);

if let Some(config) = config.hooks.on_insert.as_ref() {
if let Some(config) = hooks.on_insert.as_ref() {
config(&acc.pending_rows);
}

Expand All @@ -657,6 +695,23 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu
drop(oneshot); // signals the oneshot
},

Command::UpdateConfig(new_config) => {
// Warn if properties changed that we can't change here.
if config.max_commands_in_flight != new_config.max_commands_in_flight ||
config.max_chunks_in_flight != new_config.max_chunks_in_flight {
re_log::warn!("Cannot change max commands/chunks in flight after batcher has been created. Previous max commands/chunks: {:?}/{:?}, new max commands/chunks: {:?}/{:?}",
config.max_commands_in_flight, config.max_chunks_in_flight, new_config.max_commands_in_flight, new_config.max_chunks_in_flight);
}

re_log::trace!("Updated batcher config: {:?}", new_config);
if let Some(on_config_change) = hooks.on_config_change.as_ref() {
on_config_change(&new_config);
}

config = new_config;
rx_tick = crossbeam::channel::tick(config.flush_tick);
}

Command::Shutdown => break,
};
},
Expand Down Expand Up @@ -1039,7 +1094,7 @@ mod tests {
/// A bunch of rows that don't fit any of the split conditions should end up together.
#[test]
fn simple() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let timeline1 = Timeline::new_duration("log_time");

Expand Down Expand Up @@ -1148,7 +1203,7 @@ mod tests {
#[test]
#[allow(clippy::len_zero)]
fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let timeline1 = Timeline::new_duration("log_time");

Expand Down Expand Up @@ -1260,7 +1315,7 @@ mod tests {
/// A bunch of rows that don't fit any of the split conditions should end up together.
#[test]
fn simple_static() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let static_ = TimePoint::default();

Expand Down Expand Up @@ -1331,7 +1386,7 @@ mod tests {
/// A bunch of rows belonging to different entities will end up in different batches.
#[test]
fn different_entities() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let timeline1 = Timeline::new_duration("log_time");

Expand Down Expand Up @@ -1434,7 +1489,7 @@ mod tests {
/// A bunch of rows with different sets of timelines will end up in different batches.
#[test]
fn different_timelines() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let timeline1 = Timeline::new_duration("log_time");
let timeline2 = Timeline::new_sequence("frame_nr");
Expand Down Expand Up @@ -1547,7 +1602,7 @@ mod tests {
/// A bunch of rows with different datatypes will end up in different batches.
#[test]
fn different_datatypes() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;

let timeline1 = Timeline::new_duration("log_time");

Expand Down Expand Up @@ -1651,10 +1706,13 @@ mod tests {
/// threshold, we don't do anything special.
#[test]
fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 1000,
..ChunkBatcherConfig::NEVER
})?;
let batcher = ChunkBatcher::new(
ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 1000,
..ChunkBatcherConfig::NEVER
},
BatcherHooks::NONE,
)?;

let timeline1 = Timeline::new_duration("log_time");
let timeline2 = Timeline::new_duration("frame_nr");
Expand Down Expand Up @@ -1755,10 +1813,13 @@ mod tests {
/// threshold, we split it.
#[test]
fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
let batcher = ChunkBatcher::new(ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 3,
..ChunkBatcherConfig::NEVER
})?;
let batcher = ChunkBatcher::new(
ChunkBatcherConfig {
chunk_max_rows_if_unsorted: 3,
..ChunkBatcherConfig::NEVER
},
BatcherHooks::NONE,
)?;

let timeline1 = Timeline::new_duration("log_time");
let timeline2 = Timeline::new_duration("frame_nr");
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub use self::range::{RangeQuery, RangeQueryOptions};

#[cfg(not(target_arch = "wasm32"))]
pub use self::batcher::{
ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkBatcherResult, PendingRow,
BatcherHooks, ChunkBatcher, ChunkBatcherConfig, ChunkBatcherError, ChunkBatcherResult,
PendingRow,
};

// Re-exports
Expand Down
14 changes: 9 additions & 5 deletions crates/store/re_chunk_store/tests/memory_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
// ----------------------------------------------------------------------------

use re_chunk::{
ChunkBatcher, ChunkBatcherConfig, PendingRow, external::crossbeam::channel::TryRecvError,
BatcherHooks, ChunkBatcher, ChunkBatcherConfig, PendingRow,
external::crossbeam::channel::TryRecvError,
};
use re_chunk_store::{ChunkStore, ChunkStoreConfig};
use re_log_types::{TimePoint, Timeline};
Expand All @@ -92,10 +93,13 @@ fn scalar_memory_overhead() {
ChunkStoreConfig::default(),
);

let batcher = ChunkBatcher::new(ChunkBatcherConfig {
flush_num_rows: 1000,
..ChunkBatcherConfig::NEVER
})
let batcher = ChunkBatcher::new(
ChunkBatcherConfig {
flush_num_rows: 1000,
..ChunkBatcherConfig::NEVER
},
BatcherHooks::NONE,
)
.unwrap();

for i in 0..NUM_SCALARS {
Expand Down
6 changes: 6 additions & 0 deletions crates/top/re_sdk/src/grpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use re_chunk::ChunkBatcherConfig;
use re_log_types::LogMsg;

/// A [`crate::sink::LogSink`] tied to a hosted Rerun gRPC server.
Expand Down Expand Up @@ -81,6 +82,11 @@ impl crate::sink::LogSink for GrpcServerSink {
}
}

fn default_batcher_config(&self) -> ChunkBatcherConfig {
// The GRPC sink is typically used for live streams.
ChunkBatcherConfig::LOW_LATENCY
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down
Loading
Loading