Skip to content

Commit b571e5f

Browse files
committed
add tests for sink dependent batcher configuration
1 parent 65e5c1c commit b571e5f

File tree

2 files changed

+167
-1
lines changed

2 files changed

+167
-1
lines changed

crates/store/re_chunk/src/batcher.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ pub struct BatcherHooks {
5151
#[allow(clippy::type_complexity)]
5252
pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
5353

54+
/// Called when the batcher's configuration changes.
55+
///
56+
/// Called for initial configuration as well as subsequent changes.
57+
/// Used for testing.
58+
pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,
59+
5460
/// Callback to be run when an Arrow Chunk goes out of scope.
5561
///
5662
/// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
@@ -62,6 +68,7 @@ pub struct BatcherHooks {
6268
impl BatcherHooks {
6369
pub const NONE: Self = Self {
6470
on_insert: None,
71+
on_config_change: None,
6572
on_release: None,
6673
};
6774
}
@@ -70,6 +77,7 @@ impl PartialEq for BatcherHooks {
7077
fn eq(&self, other: &Self) -> bool {
7178
let Self {
7279
on_insert,
80+
on_config_change,
7381
on_release,
7482
} = self;
7583

@@ -79,18 +87,26 @@ impl PartialEq for BatcherHooks {
7987
_ => false,
8088
};
8189

82-
on_insert_eq && on_release == &other.on_release
90+
let on_config_change_eq = match (on_config_change, &other.on_config_change) {
91+
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
92+
(None, None) => true,
93+
_ => false,
94+
};
95+
96+
on_insert_eq && on_config_change_eq && on_release == &other.on_release
8397
}
8498
}
8599

86100
impl std::fmt::Debug for BatcherHooks {
87101
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88102
let Self {
89103
on_insert,
104+
on_config_change,
90105
on_release,
91106
} = self;
92107
f.debug_struct("BatcherHooks")
93108
.field("on_insert", &on_insert.as_ref().map(|_| "…"))
109+
.field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
94110
.field("on_release", &on_release)
95111
.finish()
96112
}
@@ -617,6 +633,10 @@ fn batching_thread(
617633
config.flush_num_rows,
618634
re_format::format_bytes(config.flush_num_bytes as _),
619635
);
636+
// Signal initial config
637+
if let Some(on_config_change) = hooks.on_config_change.as_ref() {
638+
on_config_change(&config);
639+
}
620640

621641
// Set to `true` when a flush is triggered for a reason other than hitting the time threshold,
622642
// so that the next tick will not unnecessarily fire early.
@@ -683,6 +703,9 @@ fn batching_thread(
683703
}
684704

685705
re_log::trace!("Updated batcher config: {:?}", new_config);
706+
if let Some(on_config_change) = hooks.on_config_change.as_ref() {
707+
on_config_change(&new_config);
708+
}
686709

687710
config = new_config;
688711
rx_tick = crossbeam::channel::tick(config.flush_tick);

crates/top/re_sdk/src/recording_stream.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3069,4 +3069,147 @@ mod tests {
30693069
)
30703070
.unwrap();
30713071
}
3072+
3073+
struct BatcherConfigTestSink {
3074+
config: ChunkBatcherConfig,
3075+
}
3076+
3077+
impl LogSink for BatcherConfigTestSink {
3078+
fn default_batcher_config(&self) -> ChunkBatcherConfig {
3079+
self.config.clone()
3080+
}
3081+
3082+
fn send(&self, _msg: LogMsg) {
3083+
// noop
3084+
}
3085+
3086+
fn flush_blocking(&self) {
3087+
// noop
3088+
}
3089+
3090+
fn as_any(&self) -> &dyn std::any::Any {
3091+
self
3092+
}
3093+
}
3094+
3095+
struct ScopedEnvVarSet {
3096+
key: &'static str,
3097+
}
3098+
3099+
impl ScopedEnvVarSet {
3100+
#[allow(unsafe_code)]
3101+
fn new(key: &'static str, value: &'static str) -> Self {
3102+
unsafe { std::env::set_var(key, value) };
3103+
Self { key }
3104+
}
3105+
}
3106+
3107+
impl Drop for ScopedEnvVarSet {
3108+
#[allow(unsafe_code)]
3109+
fn drop(&mut self) {
3110+
unsafe {
3111+
std::env::remove_var(self.key);
3112+
}
3113+
}
3114+
}
3115+
3116+
const CONFIG_CHANGE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
3117+
3118+
#[test]
3119+
3120+
fn test_sink_dependent_batcher_config() {
3121+
let (tx, rx) = std::sync::mpsc::channel();
3122+
3123+
let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3124+
.batcher_hooks(BatcherHooks {
3125+
on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3126+
tx.send(config.clone()).unwrap();
3127+
})),
3128+
..BatcherHooks::NONE
3129+
})
3130+
.buffered()
3131+
.unwrap();
3132+
3133+
let new_config = rx
3134+
.recv_timeout(CONFIG_CHANGE_TIMEOUT)
3135+
.expect("no config change message received within timeout");
3136+
assert_eq!(new_config, ChunkBatcherConfig::DEFAULT); // buffered sink uses the default config.
3137+
3138+
// Change sink to our custom sink. Will it take over the setting?
3139+
let injected_config = ChunkBatcherConfig {
3140+
flush_tick: std::time::Duration::from_secs(123),
3141+
flush_num_bytes: 123,
3142+
flush_num_rows: 123,
3143+
..ChunkBatcherConfig::DEFAULT
3144+
};
3145+
rec.set_sink(Box::new(BatcherConfigTestSink {
3146+
config: injected_config.clone(),
3147+
}));
3148+
let new_config = rx
3149+
.recv_timeout(CONFIG_CHANGE_TIMEOUT)
3150+
.expect("no config change message received within timeout");
3151+
3152+
assert_eq!(new_config, injected_config);
3153+
3154+
// Set flush num bytes through env var and set the sink again.
3155+
// check that the env var is respected.
3156+
let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_NUM_BYTES", "456");
3157+
rec.set_sink(Box::new(BatcherConfigTestSink {
3158+
config: injected_config.clone(),
3159+
}));
3160+
let new_config = rx
3161+
.recv_timeout(CONFIG_CHANGE_TIMEOUT)
3162+
.expect("no config change message received within timeout");
3163+
assert_eq!(
3164+
new_config,
3165+
ChunkBatcherConfig {
3166+
flush_num_bytes: 456,
3167+
..injected_config
3168+
},
3169+
);
3170+
}
3171+
3172+
#[test]
3173+
fn test_explicit_batcher_config() {
3174+
// This environment variable should still override the explicit config.
3175+
let _scoped_env_guard = ScopedEnvVarSet::new("RERUN_FLUSH_TICK_SECS", "456");
3176+
let explicit_config = ChunkBatcherConfig {
3177+
flush_tick: std::time::Duration::from_secs(123),
3178+
flush_num_bytes: 123,
3179+
flush_num_rows: 123,
3180+
..ChunkBatcherConfig::DEFAULT
3181+
};
3182+
let expected_config = ChunkBatcherConfig {
3183+
flush_tick: std::time::Duration::from_secs(456),
3184+
..explicit_config
3185+
};
3186+
3187+
let (tx, rx) = std::sync::mpsc::channel();
3188+
let rec = RecordingStreamBuilder::new("rerun_example_test_batcher_config")
3189+
.batcher_config(explicit_config)
3190+
.batcher_hooks(BatcherHooks {
3191+
on_config_change: Some(Arc::new(move |config: &ChunkBatcherConfig| {
3192+
tx.send(config.clone()).unwrap();
3193+
})),
3194+
..BatcherHooks::NONE
3195+
})
3196+
.buffered()
3197+
.unwrap();
3198+
3199+
let new_config = rx
3200+
.recv_timeout(CONFIG_CHANGE_TIMEOUT)
3201+
.expect("no config change message received within timeout");
3202+
assert_eq!(new_config, expected_config);
3203+
3204+
// Changing the sink should have no effect since an explicit config is in place.
3205+
rec.set_sink(Box::new(BatcherConfigTestSink {
3206+
config: ChunkBatcherConfig::ALWAYS,
3207+
}));
3208+
// Don't want to stall the test for CONFIG_CHANGE_TIMEOUT here.
3209+
let new_config_recv_result = rx.recv_timeout(std::time::Duration::from_millis(100));
3210+
assert_eq!(
3211+
new_config_recv_result,
3212+
Err(std::sync::mpsc::RecvTimeoutError::Timeout)
3213+
);
3214+
}
30723215
}

0 commit comments

Comments
 (0)