Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seed of C and C++ SDKs #2594

Merged
merged 42 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f644195
Optimize rrd decoding slightly
emilk Jun 29, 2023
8c15469
Add some deseriaization profiling scopes
emilk Jun 29, 2023
106dacd
Make `ArrowMsg` deserialization more robust
emilk Jun 29, 2023
8061b69
Fix some warnings in `re_log_encoding`
emilk Jun 30, 2023
4382f5b
Add `once_cell` to workspace dependency list
emilk Jun 30, 2023
b22a946
Add `re_sdk::build_info()`
emilk Jun 30, 2023
8306302
C SDK: initial commit
emilk Jun 28, 2023
d10b292
C SDK: Return Rerun version string
emilk Jun 30, 2023
28361e5
C SDK example: Add a Makefule
emilk Jun 30, 2023
a8e8243
Add `StoreSource::CSdk` and `AppEnvironment::CSdk`
emilk Jun 30, 2023
3b6196e
C SDK: Create and destroy recording streams
emilk Jun 30, 2023
4492266
Minimal C++ SDK seed
emilk Jun 30, 2023
68b7f3a
Better logging around sinks and shutdown
emilk Jun 30, 2023
8b6bc1a
Add TCP connection timeout
emilk Jun 30, 2023
fc11921
.vscode: Add debug launcher of minimal example
emilk Jun 30, 2023
24f725d
C SDK: Fix hanging during shutdown
emilk Jun 30, 2023
f46478e
Implement log function
emilk Jun 30, 2023
c3dde55
Add Makefile:s
emilk Jul 2, 2023
d34a012
vscode: Recommend clang-format
emilk Jul 2, 2023
888b151
Require C++17
emilk Jul 2, 2023
87cf111
Format code and fix some warnings
emilk Jul 2, 2023
bce0d69
Turn on more compiler warnings
emilk Jul 2, 2023
a23962d
CMakeFIle for C++ Arrow Library
emilk Jul 2, 2023
e3611f6
Add clang-format
emilk Jul 2, 2023
85ffa46
Replace Makefile with cmake for C++
emilk Jul 2, 2023
4f5b910
Format C++ more like Rust
emilk Jul 3, 2023
b20f9a6
Use Loguru to get stack traces
emilk Jul 3, 2023
322e396
Log rerun.point3d in C++, have it ingest into the Rerun Viewer
emilk Jul 3, 2023
4db2a87
Sanity-check schemas of incoming arrow messages in debug builds
emilk Jul 3, 2023
604677b
Fix: poins3d is not nullable
emilk Jul 3, 2023
535b6c1
Add accepted label `C/C++ SDK`
emilk Jul 3, 2023
b63a03a
Typos
emilk Jul 3, 2023
dddbdad
Cleanup
emilk Jul 3, 2023
8a3626c
Fix a couple of TODOs
emilk Jul 3, 2023
1950a7e
Fix doclink
emilk Jul 3, 2023
99019d9
add Linux support to C example
teh-cmc Jul 4, 2023
a47ac5c
add Linux support to C++ example
teh-cmc Jul 4, 2023
7eae18d
Merge branch 'main' into emilk/c-sdk
emilk Jul 4, 2023
5f7dcd8
Update .clang-format
emilk Jul 4, 2023
83bbb9f
use `rr_snake_case` convention for all of C SDK
emilk Jul 4, 2023
18b32b1
clang-format
emilk Jul 4, 2023
e91883a
change field name and explain it a bit
emilk Jul 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
BasedOnStyle: Google

# Make it slightly more similar to Rust.
# Based loosly on https://gist.github.com/YodaEmbedding/c2c77dc693d11f3734d78489f9a6eea4
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: Empty
AllowShortIfStatementsOnASingleLine: Never
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
BinPackArguments: false
ColumnLimit: 100
ContinuationIndentWidth: 4
IndentWidth: 4
IndentWrappedFunctionNames: true
InsertTrailingCommas: Wrapped
MaxEmptyLinesToKeep: 1
NamespaceIndentation: All
PointerAlignment: Left
SpacesBeforeTrailingComments: 1
2 changes: 1 addition & 1 deletion .github/workflows/labels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
with:
mode: minimum
count: 1
labels: "📊 analytics, 🪳 bug, codegen/idl, 🧑‍💻 dev experience, dependencies, 📖 documentation, 💬 discussion, examples, 📉 performance, 🐍 python API, ⛃ re_datastore, 📺 re_viewer, 🔺 re_renderer, 🚜 refactor, ⛴ release, 🦀 rust SDK, 🔨 testing, ui, 🕸️ web"
labels: "📊 analytics, 🪳 bug, C/C++ SDK, codegen/idl, 🧑‍💻 dev experience, dependencies, 📖 documentation, 💬 discussion, examples, 📉 performance, 🐍 python API, ⛃ re_datastore, 📺 re_viewer, 🔺 re_renderer, 🚜 refactor, ⛴ release, 🦀 rust SDK, 🔨 testing, ui, 🕸️ web"
3 changes: 2 additions & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// for the documentation about the extensions.json format
"recommendations": [
"charliermarsh.ruff",
"gaborv.flatbuffers",
"github.vscode-github-actions",
"ms-python.python",
"ms-vsliveshare.vsliveshare",
Expand All @@ -14,7 +15,7 @@
"vadimcn.vscode-lldb",
"wayou.vscode-todo-highlight",
"webfreak.debug",
"xaver.clang-format", // C++ formatter
"zxh404.vscode-proto3",
"gaborv.flatbuffers"
]
}
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@
],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'minimal' example",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"build",
"--package=minimal",
],
"filter": {
"name": "minimal",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}",
"env": {
"RUST_LOG": "trace"
}
},
{
"name": "Debug re_renderer --example=multiview",
"type": "lldb",
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ macaw = "0.18"
mimalloc = "0.1.29"
ndarray = "0.15"
nohash-hasher = "0.2"
once_cell = "1.17"
parking_lot = "0.12"
polars-core = "0.29"
polars-lazy = "0.29"
Expand Down
2 changes: 1 addition & 1 deletion crates/re_analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ re_log.workspace = true
# External dependencies:
anyhow.workspace = true
crossbeam.workspace = true
once_cell = "1.17"
once_cell.workspace = true
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
Expand Down
4 changes: 3 additions & 1 deletion crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ serde = ["dep:serde", "re_log_types/serde"]

[dependencies]
re_arrow_store.workspace = true
re_components.workspace = true
re_format.workspace = true
re_int_histogram.workspace = true
re_log.workspace = true
re_log_encoding = { workspace = true, optional = true }
re_log_types.workspace = true
re_log.workspace = true
re_smart_channel.workspace = true
re_tracing.workspace = true

ahash.workspace = true
arrow2.workspace = true
document-features = "0.2"
itertools = { workspace = true }
nohash-hasher = "0.2"
Expand Down
43 changes: 43 additions & 0 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl EntityDb {
fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
re_tracing::profile_function!();

#[cfg(debug_assertions)]
check_known_component_schemas(msg);

// TODO(#1760): Compute the size of the datacells in the batching threads on the clients.
let mut table = DataTable::from_arrow_msg(msg)?;
table.compute_all_size_bytes();
Expand Down Expand Up @@ -171,6 +174,46 @@ impl EntityDb {
}
}

/// Check that known (`rerun.`) components have the expected schemas.
#[cfg(debug_assertions)]
fn check_known_component_schemas(msg: &ArrowMsg) {
// Check that we have the expected schemas
let known_fields: ahash::HashMap<&str, &arrow2::datatypes::Field> =
re_components::iter_registered_field_types()
.map(|field| (field.name.as_str(), field))
.collect();

for actual in &msg.schema.fields {
if let Some(expected) = known_fields.get(actual.name.as_str()) {
if let arrow2::datatypes::DataType::List(actual_field) = &actual.data_type {
if actual_field.data_type != expected.data_type {
re_log::warn_once!(
"The incoming component {:?} had the type:\n{:#?}\nExpected type:\n{:#?}",
actual.name,
actual_field.data_type,
expected.data_type,
);
}
if actual.is_nullable != expected.is_nullable {
re_log::warn_once!(
"The incoming component {:?} has is_nullable={}, expected is_nullable={}",
actual.name,
actual.is_nullable,
expected.is_nullable,
);
}
} else {
re_log::warn_once!(
"The incoming component {:?} was:\n{:#?}\nExpected:\n{:#?}",
actual.name,
actual.data_type,
expected.data_type,
);
}
}
}
}

// ----------------------------------------------------------------------------

/// A in-memory database built from a stream of [`LogMsg`]es.
Expand Down
36 changes: 24 additions & 12 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum DecodeError {
// ----------------------------------------------------------------------------

pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
re_tracing::profile_function!();
let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
Expand Down Expand Up @@ -129,33 +130,44 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
},
};

let uncompressed_len = header.uncompressed_len as usize;
self.uncompressed
.resize(self.uncompressed.len().max(uncompressed_len), 0);

match self.compression {
Compression::Off => {
self.uncompressed
.resize(header.uncompressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.uncompressed) {
re_tracing::profile_scope!("read uncompressed");
if let Err(err) = self
.read
.read_exact(&mut self.uncompressed[..uncompressed_len])
{
return Some(Err(DecodeError::Read(err)));
}
}
Compression::LZ4 => {
self.compressed.resize(header.compressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.compressed) {
return Some(Err(DecodeError::Read(err)));
let compressed_len = header.compressed_len as usize;
self.compressed
.resize(self.compressed.len().max(compressed_len), 0);

{
re_tracing::profile_scope!("read compressed");
if let Err(err) = self.read.read_exact(&mut self.compressed[..compressed_len]) {
return Some(Err(DecodeError::Read(err)));
}
}
self.uncompressed
.resize(header.uncompressed_len as usize, 0);

re_tracing::profile_scope!("lz4");
if let Err(err) =
lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed)
{
if let Err(err) = lz4_flex::block::decompress_into(
&self.compressed[..compressed_len],
&mut self.uncompressed[..uncompressed_len],
) {
return Some(Err(DecodeError::Lz4(err)));
}
}
}

re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_slice(&self.uncompressed) {
match rmp_serde::from_slice(&self.uncompressed[..uncompressed_len]) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
Expand Down
12 changes: 8 additions & 4 deletions crates/re_log_encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub use file_sink::{FileSink, FileSinkError};

#[cfg(any(feature = "encoder", feature = "decoder"))]
const RRD_HEADER: &[u8; 4] = b"RRF2";

#[cfg(feature = "decoder")]
const OLD_RRD_HEADERS: &[[u8; 4]] = &[*b"RRF0", *b"RRF1"];

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -113,6 +115,7 @@ pub(crate) struct FileHeader {
}

impl FileHeader {
#[cfg(feature = "decoder")]
pub const SIZE: usize = 12;

#[cfg(feature = "encoder")]
Expand Down Expand Up @@ -154,6 +157,7 @@ pub(crate) struct MessageHeader {
}

impl MessageHeader {
#[cfg(feature = "decoder")]
pub const SIZE: usize = 8;

#[cfg(feature = "encoder")]
Expand All @@ -170,6 +174,10 @@ impl MessageHeader {

#[cfg(feature = "decoder")]
pub fn decode(read: &mut impl std::io::Read) -> Result<Self, decoder::DecodeError> {
fn u32_from_le_slice(bytes: &[u8]) -> u32 {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}

let mut buffer = [0_u8; Self::SIZE];
read.read_exact(&mut buffer)
.map_err(decoder::DecodeError::Read)?;
Expand All @@ -181,7 +189,3 @@ impl MessageHeader {
})
}
}

pub(crate) fn u32_from_le_slice(bytes: &[u8]) -> u32 {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}
41 changes: 30 additions & 11 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ impl serde::Serialize for ArrowMsg {
where
S: serde::Serializer,
{
re_tracing::profile_scope!("ArrowMsg::serialize");

use arrow2::io::ipc::write::StreamWriter;
use serde::ser::SerializeTuple;

Expand Down Expand Up @@ -76,12 +78,14 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
where
A: serde::de::SeqAccess<'de>,
{
re_tracing::profile_scope!("ArrowMsg::deserialize");

let table_id: Option<TableId> = seq.next_element()?;
let timepoint_min: Option<TimePoint> = seq.next_element()?;
let timepoint_max: Option<TimePoint> = seq.next_element()?;
let buf: Option<serde_bytes::ByteBuf> = seq.next_element()?;

if let (Some(table_id), Some(timepoint_min), Some(buf)) =
(table_id, timepoint_min, buf)
if let (Some(table_id), Some(timepoint_max), Some(buf)) =
(table_id, timepoint_max, buf)
{
let mut cursor = std::io::Cursor::new(buf);
let metadata = match read_stream_metadata(&mut cursor) {
Expand All @@ -92,21 +96,36 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
)))
}
};
let mut stream = StreamReader::new(cursor, metadata, None);
let chunk = stream
.find_map(|state| match state {
Ok(StreamState::Some(chunk)) => Some(chunk),
let schema = metadata.schema.clone();
let stream = StreamReader::new(cursor, metadata, None);
let chunks: Result<Vec<_>, _> = stream
.map(|state| match state {
Ok(StreamState::Some(chunk)) => Ok(chunk),
Ok(StreamState::Waiting) => {
unreachable!("cannot be waiting on a fixed buffer")
}
_ => None,
Err(err) => Err(err),
})
.ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?;
.collect();

let chunks = chunks
.map_err(|err| serde::de::Error::custom(format!("Arrow error: {err}")))?;

if chunks.is_empty() {
return Err(serde::de::Error::custom("No Chunk found in stream"));
}
if chunks.len() > 1 {
return Err(serde::de::Error::custom(format!(
"Found {} chunks in stream - expected just one.",
chunks.len()
)));
}
let chunk = chunks.into_iter().next().unwrap();

Ok(ArrowMsg {
table_id,
timepoint_max: timepoint_min,
schema: stream.metadata().schema.clone(),
timepoint_max,
schema,
chunk,
})
} else {
Expand Down
Loading