Skip to content

Commit

Permalink
End-to-end testing of python logging -> store ingestion (#1817)
Browse files Browse the repository at this point in the history
* Sort the arguments to `rerun`

* Pass on `LogMsg::Goodbye` just like any other message

* Add `rerun --test-receive`

* `just py-build --quiet` is now possible

* Add scripts/run_python_e2e_test.py

* replace `cargo r -p rerun` with `python3 -m rerun`

* lint and explain choice of examples

* Add to CI

* check returncode
  • Loading branch information
emilk authored Apr 12, 2023
1 parent c254e2f commit 5da248d
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ jobs:
- name: Run tests
run: cd rerun_py/tests && pytest

- name: Run e2e test
run: scripts/run_python_e2e_test.py

- name: Unpack the wheel
shell: bash
run: |
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ async fn run_client(
let msg = crate::decode_log_msg(&packet)?;

if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Client sent goodbye message.");
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}

Expand Down
1 change: 1 addition & 0 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ web_viewer = [

[dependencies]
re_build_info.workspace = true
re_data_store.workspace = true
re_format.workspace = true
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }
re_log_types.workspace = true
Expand Down
103 changes: 76 additions & 27 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,58 +36,67 @@ use crate::web_viewer::host_web_viewer;
#[derive(Debug, clap::Parser)]
#[clap(author, about)]
struct Args {
/// Print version and quit
// Note: arguments are sorted lexicographically for nicer `--help` message:
#[command(subcommand)]
commands: Option<Commands>,

/// Set a maximum input latency, e.g. "200ms" or "10s".
///
/// If we go over this, we start dropping packets.
///
/// The default is no limit, which means Rerun might eat more and more memory,
/// and have longer and longer latency, if you are logging data faster
/// than Rerun can index it.
#[clap(long)]
version: bool,
drop_at_latency: Option<String>,

/// Either a path to a `.rrd` file to load, an http url to an `.rrd` file,
/// or a websocket url to a Rerun Server from which to read data
/// An upper limit on how much memory the Rerun Viewer should use.
///
/// If none is given, a server will be hosted which the Rerun SDK can connect to.
url_or_path: Option<String>,
/// When this limit is used, Rerun will purge the oldest data.
///
/// Example: `16GB`
#[clap(long)]
memory_limit: Option<String>,

/// What TCP port do we listen to (for SDK:s to connect to)?
#[cfg(feature = "server")]
#[clap(long, default_value_t = re_sdk_comms::DEFAULT_SERVER_PORT)]
port: u16,

/// Start the viewer in the browser (instead of locally).
/// Requires Rerun to have been compiled with the 'web_viewer' feature.
/// Start with the puffin profiler running.
#[clap(long)]
web_viewer: bool,
profile: bool,

/// Stream incoming log events to an .rrd file at the given path.
#[clap(long)]
save: Option<String>,

/// Start with the puffin profiler running.
#[clap(long)]
profile: bool,

/// Exit with a non-zero exit code if any warning or error is logged. Useful for tests.
#[clap(long)]
strict: bool,

/// An upper limit on how much memory the Rerun Viewer should use.
/// Ingest data and then quit once the goodbye message has been received.
///
/// When this limit is used, Rerun will purge the oldest data.
/// Used for testing together with the `--strict` argument.
///
/// Example: `16GB`
/// Fails if no messages are received, or if no messages are received within a dozen or so seconds.
#[clap(long)]
memory_limit: Option<String>,
test_receive: bool,

/// Set a maximum input latency, e.g. "200ms" or "10s".
///
/// If we go over this, we start dropping packets.
/// Either a path to a `.rrd` file to load, an http url to an `.rrd` file,
/// or a websocket url to a Rerun Server from which to read data
///
/// The default is no limit, which means Rerun might eat more and more memory,
/// and have longer and longer latency, if you are logging data faster
/// than Rerun can index it.
/// If none is given, a server will be hosted which the Rerun SDK can connect to.
url_or_path: Option<String>,

/// Print version and quit
#[clap(long)]
drop_at_latency: Option<String>,
version: bool,

#[command(subcommand)]
commands: Option<Commands>,
/// Start the viewer in the browser (instead of locally).
/// Requires Rerun to have been compiled with the 'web_viewer' feature.
#[clap(long)]
web_viewer: bool,
}

#[derive(Debug, Clone, Subcommand)]
Expand Down Expand Up @@ -329,7 +338,9 @@ async fn run_impl(

// Now what do we do with the data?

if let Some(rrd_path) = args.save {
if args.test_receive {
receive_into_log_db(&rx).map(|_db| ())
} else if let Some(rrd_path) = args.save {
Ok(stream_to_rrd(&rx, &rrd_path.into(), &shutdown_bool)?)
} else if args.web_viewer {
#[cfg(feature = "web_viewer")]
Expand Down Expand Up @@ -404,6 +415,44 @@ async fn run_impl(
}
}

fn receive_into_log_db(rx: &Receiver<LogMsg>) -> anyhow::Result<re_data_store::LogDb> {
use re_smart_channel::RecvTimeoutError;

re_log::info!("Receiving messages into a LogDb…");

let mut db = re_data_store::LogDb::default();

let mut num_messages = 0;

let timeout = std::time::Duration::from_secs(12);

loop {
match rx.recv_timeout(timeout) {
Ok(msg) => {
re_log::info_once!("Received first message.");
let is_goodbye = matches!(msg, re_log_types::LogMsg::Goodbye(_));
db.add(msg)?;
num_messages += 1;
if is_goodbye {
db.entity_db.data_store.sanity_check()?;
anyhow::ensure!(0 < num_messages, "No messages received");
re_log::info!("Successfully ingested {num_messages} messages.");
return Ok(db);
}
}
Err(RecvTimeoutError::Timeout) => {
anyhow::bail!(
"Didn't receive any messages within {} seconds. Giving up.",
timeout.as_secs()
);
}
Err(RecvTimeoutError::Disconnected) => {
anyhow::bail!("Channel disconnected without a Goodbye message.");
}
}
}
}

enum ArgumentCategory {
/// A remote RRD file, served over http.
RrdHttpUrl(String),
Expand Down
5 changes: 3 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ py-run-all: py-build
fd main.py | xargs -I _ sh -c "echo _ && python3 _"
# Build and install the package into the venv
py-build:
py-build *ARGS:
#!/usr/bin/env bash
set -euo pipefail
unset CONDA_PREFIX && \
source venv/bin/activate && \
maturin develop \
-m rerun_py/Cargo.toml \
--extras="tests"
--extras="tests" \
{{ARGS}}
# Run autoformatting
py-format:
Expand Down
79 changes: 79 additions & 0 deletions scripts/run_python_e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3

"""
Run some of our python exeamples, piping their log stream to the rerun process.
This is an end-to-end test for testing:
* Our Python API
* LogMsg encoding/decoding
* Arrow encoding/decoding
* TCP connection
* Data store ingestion
"""

import os
import subprocess
import sys
import time


def main() -> None:
build_env = os.environ.copy()
if "RUST_LOG" in build_env:
del build_env["RUST_LOG"] # The user likely only meant it for the actual tests; not the setup

print("----------------------------------------------------------")
print("Building rerun-sdk…")
start_time = time.time()
subprocess.Popen(["just", "py-build", "--quiet"], env=build_env).wait()
elapsed = time.time() - start_time
print(f"rerun-sdk built in {elapsed:.1f} seconds")
print("")

examples = [
# Trivial examples that don't require weird dependencies, or downloading data
"examples/python/api_demo/main.py",
"examples/python/car/main.py",
"examples/python/multithreading/main.py",
"examples/python/plots/main.py",
"examples/python/text_logging/main.py",
]
for example in examples:
print("----------------------------------------------------------")
print(f"Testing {example}\n")
start_time = time.time()
run_example(example)
elapsed = time.time() - start_time
print(f"{example} done in {elapsed:.1f} seconds")
print()

print()
print("All tests passed successfully!")


def run_example(example: str) -> None:
port = 9752

# sys.executable: the absolute path of the executable binary for the Python interpreter
python_executable = sys.executable
if python_executable is None:
python_executable = "python3"

rerun_process = subprocess.Popen(
[python_executable, "-m", "rerun", "--port", str(port), "--strict", "--test-receive"]
)
time.sleep(0.3) # Wait for rerun server to start to remove a logged warning

python_process = subprocess.Popen([python_executable, example, "--connect", "--addr", f"127.0.0.1:{port}"])

print("Waiting for python process to finish…")
returncode = python_process.wait(timeout=30)
assert returncode == 0, f"python process exited with error code {returncode}"

print("Waiting for rerun process to finish…")
returncode = rerun_process.wait(timeout=30)
assert returncode == 0, f"rerun process exited with error code {returncode}"


if __name__ == "__main__":
main()

1 comment on commit 5da248d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 5da248d Previous: c254e2f Ratio
datastore/num_rows=1000/num_instances=1000/packed=false/insert/default 10871026 ns/iter (± 350957) 10256134 ns/iter (± 434056) 1.06
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at/default 1825 ns/iter (± 20) 1817 ns/iter (± 20) 1.00
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/primary/default 280 ns/iter (± 1) 279 ns/iter (± 0) 1.00
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/secondaries/default 434 ns/iter (± 1) 436 ns/iter (± 0) 1.00
datastore/num_rows=1000/num_instances=1000/packed=false/range/default 11345752 ns/iter (± 529709) 10381084 ns/iter (± 297466) 1.09
mono_points_arrow/generate_message_bundles 45871565 ns/iter (± 793224) 43572009 ns/iter (± 1120680) 1.05
mono_points_arrow/generate_messages 152002884 ns/iter (± 1349616) 149214024 ns/iter (± 1115181) 1.02
mono_points_arrow/encode_log_msg 181778304 ns/iter (± 845219) 180387034 ns/iter (± 693931) 1.01
mono_points_arrow/encode_total 378345137 ns/iter (± 2345952) 379472133 ns/iter (± 2300136) 1.00
mono_points_arrow/decode_log_msg 234493797 ns/iter (± 1013798) 232193823 ns/iter (± 742826) 1.01
mono_points_arrow/decode_message_bundles 71050291 ns/iter (± 652376) 69529504 ns/iter (± 900827) 1.02
mono_points_arrow/decode_total 311528052 ns/iter (± 1317039) 306361429 ns/iter (± 1434216) 1.02
mono_points_arrow_batched/generate_message_bundles 38035383 ns/iter (± 1042329) 36630803 ns/iter (± 986396) 1.04
mono_points_arrow_batched/generate_messages 7698709 ns/iter (± 533096) 7102180 ns/iter (± 304370) 1.08
mono_points_arrow_batched/encode_log_msg 1486983 ns/iter (± 3295) 1469316 ns/iter (± 3969) 1.01
mono_points_arrow_batched/encode_total 48290705 ns/iter (± 1138444) 45984798 ns/iter (± 1237785) 1.05
mono_points_arrow_batched/decode_log_msg 861353 ns/iter (± 2424) 856740 ns/iter (± 2370) 1.01
mono_points_arrow_batched/decode_message_bundles 11916978 ns/iter (± 559697) 11429057 ns/iter (± 179088) 1.04
mono_points_arrow_batched/decode_total 13015582 ns/iter (± 598282) 12510562 ns/iter (± 309773) 1.04
batch_points_arrow/generate_message_bundles 344593 ns/iter (± 2060) 331320 ns/iter (± 601) 1.04
batch_points_arrow/generate_messages 6401 ns/iter (± 38) 6362 ns/iter (± 16) 1.01
batch_points_arrow/encode_log_msg 401666 ns/iter (± 2424) 393486 ns/iter (± 879) 1.02
batch_points_arrow/encode_total 765135 ns/iter (± 4155) 762488 ns/iter (± 1337) 1.00
batch_points_arrow/decode_log_msg 344846 ns/iter (± 1445) 333884 ns/iter (± 2112) 1.03
batch_points_arrow/decode_message_bundles 2284 ns/iter (± 17) 2328 ns/iter (± 6) 0.98
batch_points_arrow/decode_total 351331 ns/iter (± 1725) 337767 ns/iter (± 639) 1.04
arrow_mono_points/insert 6524020995 ns/iter (± 31705062) 6417918245 ns/iter (± 18029498) 1.02
arrow_mono_points/query 1794474 ns/iter (± 16825) 1754581 ns/iter (± 14456) 1.02
arrow_batch_points/insert 3230521 ns/iter (± 12475) 3186548 ns/iter (± 8533) 1.01
arrow_batch_points/query 16291 ns/iter (± 83) 16415 ns/iter (± 34) 0.99
arrow_batch_vecs/insert 45236 ns/iter (± 292) 44602 ns/iter (± 82) 1.01
arrow_batch_vecs/query 388092 ns/iter (± 2317) 388468 ns/iter (± 1506) 1.00
tuid/Tuid::random 41 ns/iter (± 0) 34 ns/iter (± 0) 1.21

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.