Skip to content

Commit

Permalink
Seed of C and C++ SDKs (#2594)
Browse files Browse the repository at this point in the history
Part of #2516

### What
This PR introduces the seeds of the C and C++ SDK. It's all a hack right
now, and only tested on Mac, but I'd rather merge many small PRs than
build this in isolation over a long time.

A new crate `rerun_c` is introduced, which implements the C SDK (over
FFI).

The C SDK consists of a single header, `rerun.h`, with no dependencies
except on `rerun_c`.

The C++ SDK consists of a single header, `rerun.hpp`, which depends on:
* Arrow C++
* `rerun.h` (the C SDK)
* `rerun_c` (the Rust crate)

The C SDK can create recording streams and log data to them using the
arrow IPC format.

The C++ SDK adds a few helper function for generating some test data and
converting it to the Arrow IPC format.

### Testing it
* Install CMake
* Start rerun with `cargo rerun`
* Run `crates/rerun_c/example_cpp/build_and_run.sh`

You should see three points in a 3D space view. Those points come from
C++!

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2594) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/2594)
- [Docs preview](https://rerun.io/preview/pr%3Aemilk%2Fc-sdk/docs)
- [Examples
preview](https://rerun.io/preview/pr%3Aemilk%2Fc-sdk/examples)

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
emilk and teh-cmc authored Jul 4, 2023
1 parent 2b1c955 commit 0540bcc
Show file tree
Hide file tree
Showing 43 changed files with 931 additions and 59 deletions.
20 changes: 20 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
BasedOnStyle: Google

# Make it slightly more similar to Rust.
# Based loosly on https://gist.github.com/YodaEmbedding/c2c77dc693d11f3734d78489f9a6eea4
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: Empty
AllowShortIfStatementsOnASingleLine: Never
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
BinPackArguments: false
ColumnLimit: 100
ContinuationIndentWidth: 4
IndentWidth: 4
IndentWrappedFunctionNames: true
InsertTrailingCommas: Wrapped
MaxEmptyLinesToKeep: 1
NamespaceIndentation: All
PointerAlignment: Left
SpacesBeforeTrailingComments: 1
2 changes: 1 addition & 1 deletion .github/workflows/labels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
with:
mode: minimum
count: 1
labels: "📊 analytics, 🪳 bug, codegen/idl, 🧑‍💻 dev experience, dependencies, 📖 documentation, 💬 discussion, examples, 📉 performance, 🐍 python API, ⛃ re_datastore, 📺 re_viewer, 🔺 re_renderer, 🚜 refactor, ⛴ release, 🦀 rust SDK, 🔨 testing, ui, 🕸️ web"
labels: "📊 analytics, 🪳 bug, C/C++ SDK, codegen/idl, 🧑‍💻 dev experience, dependencies, 📖 documentation, 💬 discussion, examples, 📉 performance, 🐍 python API, ⛃ re_datastore, 📺 re_viewer, 🔺 re_renderer, 🚜 refactor, ⛴ release, 🦀 rust SDK, 🔨 testing, ui, 🕸️ web"
3 changes: 2 additions & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// for the documentation about the extensions.json format
"recommendations": [
"charliermarsh.ruff",
"gaborv.flatbuffers",
"github.vscode-github-actions",
"ms-python.python",
"ms-vsliveshare.vsliveshare",
Expand All @@ -14,7 +15,7 @@
"vadimcn.vscode-lldb",
"wayou.vscode-todo-highlight",
"webfreak.debug",
"xaver.clang-format", // C++ formatter
"zxh404.vscode-proto3",
"gaborv.flatbuffers"
]
}
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@
],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'minimal' example",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"build",
"--package=minimal",
],
"filter": {
"name": "minimal",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}",
"env": {
"RUST_LOG": "trace"
}
},
{
"name": "Debug re_renderer --example=multiview",
"type": "lldb",
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ macaw = "0.18"
mimalloc = "0.1.29"
ndarray = "0.15"
nohash-hasher = "0.2"
once_cell = "1.17"
parking_lot = "0.12"
polars-core = "0.29"
polars-lazy = "0.29"
Expand Down
2 changes: 1 addition & 1 deletion crates/re_analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ re_log.workspace = true
# External dependencies:
anyhow.workspace = true
crossbeam.workspace = true
once_cell = "1.17"
once_cell.workspace = true
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
Expand Down
4 changes: 3 additions & 1 deletion crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ serde = ["dep:serde", "re_log_types/serde"]

[dependencies]
re_arrow_store.workspace = true
re_components.workspace = true
re_format.workspace = true
re_int_histogram.workspace = true
re_log.workspace = true
re_log_encoding = { workspace = true, optional = true }
re_log_types.workspace = true
re_log.workspace = true
re_smart_channel.workspace = true
re_tracing.workspace = true

ahash.workspace = true
arrow2.workspace = true
document-features = "0.2"
itertools = { workspace = true }
nohash-hasher = "0.2"
Expand Down
43 changes: 43 additions & 0 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl EntityDb {
fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
re_tracing::profile_function!();

#[cfg(debug_assertions)]
check_known_component_schemas(msg);

// TODO(#1760): Compute the size of the datacells in the batching threads on the clients.
let mut table = DataTable::from_arrow_msg(msg)?;
table.compute_all_size_bytes();
Expand Down Expand Up @@ -171,6 +174,46 @@ impl EntityDb {
}
}

/// Check that known (`rerun.`) components have the expected schemas.
#[cfg(debug_assertions)]
fn check_known_component_schemas(msg: &ArrowMsg) {
// Check that we have the expected schemas
let known_fields: ahash::HashMap<&str, &arrow2::datatypes::Field> =
re_components::iter_registered_field_types()
.map(|field| (field.name.as_str(), field))
.collect();

for actual in &msg.schema.fields {
if let Some(expected) = known_fields.get(actual.name.as_str()) {
if let arrow2::datatypes::DataType::List(actual_field) = &actual.data_type {
if actual_field.data_type != expected.data_type {
re_log::warn_once!(
"The incoming component {:?} had the type:\n{:#?}\nExpected type:\n{:#?}",
actual.name,
actual_field.data_type,
expected.data_type,
);
}
if actual.is_nullable != expected.is_nullable {
re_log::warn_once!(
"The incoming component {:?} has is_nullable={}, expected is_nullable={}",
actual.name,
actual.is_nullable,
expected.is_nullable,
);
}
} else {
re_log::warn_once!(
"The incoming component {:?} was:\n{:#?}\nExpected:\n{:#?}",
actual.name,
actual.data_type,
expected.data_type,
);
}
}
}
}

// ----------------------------------------------------------------------------

/// A in-memory database built from a stream of [`LogMsg`]es.
Expand Down
36 changes: 24 additions & 12 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum DecodeError {
// ----------------------------------------------------------------------------

pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
re_tracing::profile_function!();
let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
Expand Down Expand Up @@ -129,33 +130,44 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
},
};

let uncompressed_len = header.uncompressed_len as usize;
self.uncompressed
.resize(self.uncompressed.len().max(uncompressed_len), 0);

match self.compression {
Compression::Off => {
self.uncompressed
.resize(header.uncompressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.uncompressed) {
re_tracing::profile_scope!("read uncompressed");
if let Err(err) = self
.read
.read_exact(&mut self.uncompressed[..uncompressed_len])
{
return Some(Err(DecodeError::Read(err)));
}
}
Compression::LZ4 => {
self.compressed.resize(header.compressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.compressed) {
return Some(Err(DecodeError::Read(err)));
let compressed_len = header.compressed_len as usize;
self.compressed
.resize(self.compressed.len().max(compressed_len), 0);

{
re_tracing::profile_scope!("read compressed");
if let Err(err) = self.read.read_exact(&mut self.compressed[..compressed_len]) {
return Some(Err(DecodeError::Read(err)));
}
}
self.uncompressed
.resize(header.uncompressed_len as usize, 0);

re_tracing::profile_scope!("lz4");
if let Err(err) =
lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed)
{
if let Err(err) = lz4_flex::block::decompress_into(
&self.compressed[..compressed_len],
&mut self.uncompressed[..uncompressed_len],
) {
return Some(Err(DecodeError::Lz4(err)));
}
}
}

re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_slice(&self.uncompressed) {
match rmp_serde::from_slice(&self.uncompressed[..uncompressed_len]) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
Expand Down
12 changes: 8 additions & 4 deletions crates/re_log_encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub use file_sink::{FileSink, FileSinkError};

#[cfg(any(feature = "encoder", feature = "decoder"))]
const RRD_HEADER: &[u8; 4] = b"RRF2";

#[cfg(feature = "decoder")]
const OLD_RRD_HEADERS: &[[u8; 4]] = &[*b"RRF0", *b"RRF1"];

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -113,6 +115,7 @@ pub(crate) struct FileHeader {
}

impl FileHeader {
#[cfg(feature = "decoder")]
pub const SIZE: usize = 12;

#[cfg(feature = "encoder")]
Expand Down Expand Up @@ -154,6 +157,7 @@ pub(crate) struct MessageHeader {
}

impl MessageHeader {
#[cfg(feature = "decoder")]
pub const SIZE: usize = 8;

#[cfg(feature = "encoder")]
Expand All @@ -170,6 +174,10 @@ impl MessageHeader {

#[cfg(feature = "decoder")]
pub fn decode(read: &mut impl std::io::Read) -> Result<Self, decoder::DecodeError> {
fn u32_from_le_slice(bytes: &[u8]) -> u32 {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}

let mut buffer = [0_u8; Self::SIZE];
read.read_exact(&mut buffer)
.map_err(decoder::DecodeError::Read)?;
Expand All @@ -181,7 +189,3 @@ impl MessageHeader {
})
}
}

pub(crate) fn u32_from_le_slice(bytes: &[u8]) -> u32 {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}
41 changes: 30 additions & 11 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ impl serde::Serialize for ArrowMsg {
where
S: serde::Serializer,
{
re_tracing::profile_scope!("ArrowMsg::serialize");

use arrow2::io::ipc::write::StreamWriter;
use serde::ser::SerializeTuple;

Expand Down Expand Up @@ -76,12 +78,14 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
where
A: serde::de::SeqAccess<'de>,
{
re_tracing::profile_scope!("ArrowMsg::deserialize");

let table_id: Option<TableId> = seq.next_element()?;
let timepoint_min: Option<TimePoint> = seq.next_element()?;
let timepoint_max: Option<TimePoint> = seq.next_element()?;
let buf: Option<serde_bytes::ByteBuf> = seq.next_element()?;

if let (Some(table_id), Some(timepoint_min), Some(buf)) =
(table_id, timepoint_min, buf)
if let (Some(table_id), Some(timepoint_max), Some(buf)) =
(table_id, timepoint_max, buf)
{
let mut cursor = std::io::Cursor::new(buf);
let metadata = match read_stream_metadata(&mut cursor) {
Expand All @@ -92,21 +96,36 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
)))
}
};
let mut stream = StreamReader::new(cursor, metadata, None);
let chunk = stream
.find_map(|state| match state {
Ok(StreamState::Some(chunk)) => Some(chunk),
let schema = metadata.schema.clone();
let stream = StreamReader::new(cursor, metadata, None);
let chunks: Result<Vec<_>, _> = stream
.map(|state| match state {
Ok(StreamState::Some(chunk)) => Ok(chunk),
Ok(StreamState::Waiting) => {
unreachable!("cannot be waiting on a fixed buffer")
}
_ => None,
Err(err) => Err(err),
})
.ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?;
.collect();

let chunks = chunks
.map_err(|err| serde::de::Error::custom(format!("Arrow error: {err}")))?;

if chunks.is_empty() {
return Err(serde::de::Error::custom("No Chunk found in stream"));
}
if chunks.len() > 1 {
return Err(serde::de::Error::custom(format!(
"Found {} chunks in stream - expected just one.",
chunks.len()
)));
}
let chunk = chunks.into_iter().next().unwrap();

Ok(ArrowMsg {
table_id,
timepoint_max: timepoint_min,
schema: stream.metadata().schema.clone(),
timepoint_max,
schema,
chunk,
})
} else {
Expand Down
Loading

0 comments on commit 0540bcc

Please sign in to comment.