Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End-to-end testing of python logging -> store ingestion #1817

Merged
merged 9 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ jobs:
- name: Run tests
run: cd rerun_py/tests && pytest

- name: Run e2e test
run: scripts/run_python_e2e_test.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are running this as part of the wheel job, there's not a great reason to run:

subprocess.Popen(["just", "py-build", "--quiet"], env=build_env).wait()

below. The wheel is installed into the local environment just a few lines up from here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but when running it from the command line it's quite useful… I guess I could make it opt-out

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see how much time it adds to the CI

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but when running it from the command line it's quite useful… I guess I could make it opt-out

I believe you can silently skip it if the CI env var is set.


- name: Unpack the wheel
shell: bash
run: |
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ async fn run_client(
let msg = crate::decode_log_msg(&packet)?;

if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Client sent goodbye message.");
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}

Expand Down
1 change: 1 addition & 0 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ web_viewer = [

[dependencies]
re_build_info.workspace = true
re_data_store.workspace = true
re_format.workspace = true
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }
re_log_types.workspace = true
Expand Down
103 changes: 76 additions & 27 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,58 +36,67 @@ use crate::web_viewer::host_web_viewer;
#[derive(Debug, clap::Parser)]
#[clap(author, about)]
struct Args {
/// Print version and quit
// Note: arguments are sorted lexicographically for nicer `--help` message:
#[command(subcommand)]
commands: Option<Commands>,

/// Set a maximum input latency, e.g. "200ms" or "10s".
///
/// If we go over this, we start dropping packets.
///
/// The default is no limit, which means Rerun might eat more and more memory,
/// and have longer and longer latency, if you are logging data faster
/// than Rerun can index it.
#[clap(long)]
version: bool,
drop_at_latency: Option<String>,

/// Either a path to a `.rrd` file to load, an http url to an `.rrd` file,
/// or a websocket url to a Rerun Server from which to read data
/// An upper limit on how much memory the Rerun Viewer should use.
///
/// If none is given, a server will be hosted which the Rerun SDK can connect to.
url_or_path: Option<String>,
/// When this limit is used, Rerun will purge the oldest data.
///
/// Example: `16GB`
#[clap(long)]
memory_limit: Option<String>,

/// What TCP port do we listen to (for SDK:s to connect to)?
#[cfg(feature = "server")]
#[clap(long, default_value_t = re_sdk_comms::DEFAULT_SERVER_PORT)]
port: u16,

/// Start the viewer in the browser (instead of locally).
/// Requires Rerun to have been compiled with the 'web_viewer' feature.
/// Start with the puffin profiler running.
#[clap(long)]
web_viewer: bool,
profile: bool,

/// Stream incoming log events to an .rrd file at the given path.
#[clap(long)]
save: Option<String>,

/// Start with the puffin profiler running.
#[clap(long)]
profile: bool,

/// Exit with a non-zero exit code if any warning or error is logged. Useful for tests.
#[clap(long)]
strict: bool,

/// An upper limit on how much memory the Rerun Viewer should use.
/// Ingest data and then quit once the goodbye message has been received.
///
/// When this limit is used, Rerun will purge the oldest data.
/// Used for testing together with the `--strict` argument.
///
/// Example: `16GB`
/// Fails if no messages are received, or if no messages are received within a dozen or so seconds.
#[clap(long)]
memory_limit: Option<String>,
test_receive: bool,

/// Set a maximum input latency, e.g. "200ms" or "10s".
///
/// If we go over this, we start dropping packets.
/// Either a path to a `.rrd` file to load, an http url to an `.rrd` file,
/// or a websocket url to a Rerun Server from which to read data
///
/// The default is no limit, which means Rerun might eat more and more memory,
/// and have longer and longer latency, if you are logging data faster
/// than Rerun can index it.
/// If none is given, a server will be hosted which the Rerun SDK can connect to.
url_or_path: Option<String>,

/// Print version and quit
#[clap(long)]
drop_at_latency: Option<String>,
version: bool,

#[command(subcommand)]
commands: Option<Commands>,
/// Start the viewer in the browser (instead of locally).
/// Requires Rerun to have been compiled with the 'web_viewer' feature.
#[clap(long)]
web_viewer: bool,
}

#[derive(Debug, Clone, Subcommand)]
Expand Down Expand Up @@ -329,7 +338,9 @@ async fn run_impl(

// Now what do we do with the data?

if let Some(rrd_path) = args.save {
if args.test_receive {
receive_into_log_db(&rx).map(|_db| ())
} else if let Some(rrd_path) = args.save {
Ok(stream_to_rrd(&rx, &rrd_path.into(), &shutdown_bool)?)
} else if args.web_viewer {
#[cfg(feature = "web_viewer")]
Expand Down Expand Up @@ -404,6 +415,44 @@ async fn run_impl(
}
}

fn receive_into_log_db(rx: &Receiver<LogMsg>) -> anyhow::Result<re_data_store::LogDb> {
use re_smart_channel::RecvTimeoutError;

re_log::info!("Receiving messages into a LogDb…");

let mut db = re_data_store::LogDb::default();

let mut num_messages = 0;

let timeout = std::time::Duration::from_secs(12);

loop {
match rx.recv_timeout(timeout) {
Ok(msg) => {
re_log::info_once!("Received first message.");
let is_goodbye = matches!(msg, re_log_types::LogMsg::Goodbye(_));
db.add(msg)?;
num_messages += 1;
if is_goodbye {
db.entity_db.data_store.sanity_check()?;
anyhow::ensure!(0 < num_messages, "No messages received");
re_log::info!("Successfully ingested {num_messages} messages.");
return Ok(db);
}
}
Err(RecvTimeoutError::Timeout) => {
anyhow::bail!(
"Didn't receive any messages within {} seconds. Giving up.",
timeout.as_secs()
);
}
Err(RecvTimeoutError::Disconnected) => {
anyhow::bail!("Channel disconnected without a Goodbye message.");
}
}
}
}

enum ArgumentCategory {
/// A remote RRD file, served over http.
RrdHttpUrl(String),
Expand Down
5 changes: 3 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ py-run-all: py-build
fd main.py | xargs -I _ sh -c "echo _ && python3 _"

# Build and install the package into the venv
py-build:
py-build *ARGS:
#!/usr/bin/env bash
set -euo pipefail
unset CONDA_PREFIX && \
source venv/bin/activate && \
maturin develop \
-m rerun_py/Cargo.toml \
--extras="tests"
--extras="tests" \
{{ARGS}}

# Run autoformatting
py-format:
Expand Down
79 changes: 79 additions & 0 deletions scripts/run_python_e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3

"""
Run some of our python exeamples, piping their log stream to the rerun process.

This is an end-to-end test for testing:
* Our Python API
* LogMsg encoding/decoding
* Arrow encoding/decoding
* TCP connection
* Data store ingestion
"""

import os
import subprocess
import sys
import time


def main() -> None:
build_env = os.environ.copy()
if "RUST_LOG" in build_env:
del build_env["RUST_LOG"] # The user likely only meant it for the actual tests; not the setup

print("----------------------------------------------------------")
print("Building rerun-sdk…")
start_time = time.time()
subprocess.Popen(["just", "py-build", "--quiet"], env=build_env).wait()
elapsed = time.time() - start_time
print(f"rerun-sdk built in {elapsed:.1f} seconds")
print("")

examples = [
# Trivial examples that don't require weird dependencies, or downloading data
"examples/python/api_demo/main.py",
"examples/python/car/main.py",
"examples/python/multithreading/main.py",
"examples/python/plots/main.py",
"examples/python/text_logging/main.py",
]
for example in examples:
print("----------------------------------------------------------")
print(f"Testing {example}…\n")
start_time = time.time()
run_example(example)
elapsed = time.time() - start_time
print(f"{example} done in {elapsed:.1f} seconds")
print()

print()
print("All tests passed successfully!")


def run_example(example: str) -> None:
port = 9752

# sys.executable: the absolute path of the executable binary for the Python interpreter
python_executable = sys.executable
if python_executable is None:
python_executable = "python3"

rerun_process = subprocess.Popen(
[python_executable, "-m", "rerun", "--port", str(port), "--strict", "--test-receive"]
)
time.sleep(0.3) # Wait for rerun server to start to remove a logged warning

python_process = subprocess.Popen([python_executable, example, "--connect", "--addr", f"127.0.0.1:{port}"])

print("Waiting for python process to finish…")
returncode = python_process.wait(timeout=30)
assert returncode == 0, f"python process exited with error code {returncode}"

print("Waiting for rerun process to finish…")
returncode = rerun_process.wait(timeout=30)
assert returncode == 0, f"rerun process exited with error code {returncode}"


if __name__ == "__main__":
main()