Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,082 changes: 3,082 additions & 0 deletions examples/send_bytes/Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions examples/send_bytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "send_bytes"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
livekit = { path = "../../livekit", features = ["native-tls"] }
log = "0.4.28"
env_logger = "0.11.8"
bitfield-struct = "0.11.0"
rand = "0.9.2"
colored = "3.0.0"

[workspace]
43 changes: 43 additions & 0 deletions examples/send_bytes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Send Bytes

Example demonstrating the `send_bytes` method for sending a custom packet to participants in a room.

## Usage

1. Run the example in sender mode:

```sh
export LIVEKIT_URL="..."
export LIVEKIT_TOKEN="<first participant token>"
cargo run -- sender
```

2. In a second terminal, run the example in receiver mode:

```sh
export LIVEKIT_URL="..."
export LIVEKIT_TOKEN="<second participant token>"
cargo run
```

## Custom Packet

This example uses the following hypothetical 4-byte packet structure to teleoperate 16 discrete LED indicators by setting their power states and RGB values:

```mermaid
---
title: "LED Control Packet"
config:
packet:
bitsPerRow: 8
---
packet
+2: "Version"
+5: "Channel"
+1: "On"
+8: "Red"
+8: "Green"
+8: "Blue"
```

The [_bitfield-struct_](https://crates.io/crates/bitfield-struct) crate is used to create a type-safe wrapper for getting and setting the bitfields by name.
73 changes: 73 additions & 0 deletions examples/send_bytes/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use livekit::{Room, RoomEvent, RoomOptions, StreamByteOptions, StreamReader};
use packet::LedControlPacket;
use rand::Rng;
use std::{env, error::Error, time::Duration};
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};

mod packet;

const LED_CONTROL_TOPIC: &str = "led-control";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let url = env::var("LIVEKIT_URL").expect("LIVEKIT_URL is not set");
let token = env::var("LIVEKIT_TOKEN").expect("LIVEKIT_TOKEN is not set");

let is_sender = env::args().nth(1).map_or(false, |arg| arg == "sender");

let (room, rx) = Room::connect(&url, &token, RoomOptions::default()).await?;
println!("Connected to room: {} - {}", room.name(), room.sid().await);

if is_sender { run_sender(room).await } else { run_receiver(room, rx).await }
}

async fn run_sender(room: Room) -> Result<(), Box<dyn Error>> {
println!("Running as sender");
let mut rng = rand::rng();

loop {
// Send control packets with randomized channel and color.
let packet = packet::LedControlPacket::new()
.with_version(1)
.with_channel(rng.random_range(0..16))
.with_is_on(true)
.with_red(rng.random())
.with_green(rng.random())
.with_blue(rng.random());

println!("[tx] {}", packet);

let options = StreamByteOptions { topic: LED_CONTROL_TOPIC.into(), ..Default::default() };
let be_bytes = packet.into_bits().to_be_bytes();
room.local_participant().send_bytes(&be_bytes, options).await?;

sleep(Duration::from_millis(500)).await;
}
}

async fn run_receiver(
_room: Room,
mut rx: UnboundedReceiver<RoomEvent>,
) -> Result<(), Box<dyn Error>> {
println!("Running as receiver");
println!("Waiting for LED control packets…");
while let Some(event) = rx.recv().await {
match event {
RoomEvent::BytesReceived { bytes, info, participant_identity: _ } => {
if info.topic != LED_CONTROL_TOPIC {
continue;
};
let Ok(be_bytes) = bytes[..4].try_into() else {
log::warn!("Unexpected packet length");
continue;
};
let packet = LedControlPacket::from(u32::from_be_bytes(be_bytes));
println!("[rx] {}", packet);
}
_ => {}
}
}
Ok(())
}
56 changes: 56 additions & 0 deletions examples/send_bytes/src/packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use bitfield_struct::bitfield;
use colored::Colorize;
use std::fmt::Display;

/// Custom 4-byte packet structure used for controlling LED
/// state through a LiveKit room.
#[bitfield(u32)]
pub struct LedControlPacket {
/// Packet version (0-4).
#[bits(2)]
pub version: u8,
/// Which LED is being controlled (0-15).
#[bits(5)]
pub channel: u8,
/// Whether or not the channel is on.
#[bits(1)]
pub is_on: bool,
/// Red intensity (0-255).
#[bits(8)]
pub red: u8,
/// Green intensity (0-255).
#[bits(8)]
pub green: u8,
/// Blue intensity (0-255).
#[bits(8)]
pub blue: u8,
}

impl Display for LedControlPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let color_display = if colored::control::SHOULD_COLORIZE.should_colorize() {
" ".on_truecolor(self.red(), self.green(), self.blue())
} else {
// Display RGB value if terminal color is disabled.
format!("rgb({:>3}, {:>3}, {:>3})", self.red(), self.green(), self.blue()).into()
};
write!(f, "Channel {:02} => {}", self.channel(), color_display)
}
}

#[cfg(test)]
mod tests {
use super::LedControlPacket;

#[test]
fn test_bit_representation() {
let packet = LedControlPacket::new()
.with_version(1)
.with_channel(4)
.with_is_on(true)
.with_red(31)
.with_green(213)
.with_blue(249);
assert_eq!(packet.into_bits(), 0xF9D51F91);
}
}
1 change: 1 addition & 0 deletions livekit-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"
license = "Apache-2.0"
description = "FFI interface for bindings in other languages"
repository = "https://github.com/livekit/rust-sdks"
readme = "README.md"

[features]
default = ["rustls-tls-native-roots"]
Expand Down
9 changes: 9 additions & 0 deletions livekit-ffi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# LiveKit FFI

Foreign function interface (FFI) bindings for LiveKit, used to support the following client SDKs:

- [Python](https://github.com/livekit/python-sdks)
- [NodeJS](https://github.com/livekit/node-sdks)
- [Unity](https://github.com/livekit/client-sdk-unity)

This crate is compiled as dynamic library, allowing client languages to invoke APIs from the core [_livekit_](https://crates.io/crates/livekit) crate.
22 changes: 22 additions & 0 deletions livekit-ffi/protocol/data_stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@ message StreamSendFileCallback {
}
}

// MARK: - Send bytes

// Sends bytes over a data stream.
message StreamSendBytesRequest {
required uint64 local_participant_handle = 1;

required StreamByteOptions options = 2;

// Bytes to send.
required bytes bytes = 3;
}
message StreamSendBytesResponse {
required uint64 async_id = 1;
}
message StreamSendBytesCallback {
required uint64 async_id = 1;
oneof result {
ByteStreamInfo info = 2;
StreamError error = 3;
}
}

// MARK: - Send text

// Sends text over a data stream.
Expand Down
9 changes: 7 additions & 2 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ message FfiRequest {
TextStreamWriterWriteRequest text_stream_write = 65;
TextStreamWriterCloseRequest text_stream_close = 66;

// NEXT_ID: 67
StreamSendBytesRequest send_bytes = 67;

// NEXT_ID: 68
}
}

Expand Down Expand Up @@ -243,7 +245,9 @@ message FfiResponse {
TextStreamWriterWriteResponse text_stream_write = 64;
TextStreamWriterCloseResponse text_stream_close = 65;

// NEXT_ID: 66
StreamSendBytesResponse send_bytes = 66;

// NEXT_ID: 67
}
}

Expand Down Expand Up @@ -298,6 +302,7 @@ message FfiEvent {
TextStreamWriterWriteCallback text_stream_writer_write = 38;
TextStreamWriterCloseCallback text_stream_writer_close = 39;
StreamSendTextCallback send_text = 40;
StreamSendBytesCallback send_bytes = 41;
}
}

Expand Down
51 changes: 48 additions & 3 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2447,6 +2447,45 @@ pub mod stream_send_file_callback {
Error(super::StreamError),
}
}
// MARK: - Send bytes

/// Sends bytes over a data stream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamSendBytesRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(message, required, tag="2")]
pub options: StreamByteOptions,
/// Bytes to send.
#[prost(bytes="vec", required, tag="3")]
pub bytes: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamSendBytesResponse {
#[prost(uint64, required, tag="1")]
pub async_id: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamSendBytesCallback {
#[prost(uint64, required, tag="1")]
pub async_id: u64,
#[prost(oneof="stream_send_bytes_callback::Result", tags="2, 3")]
pub result: ::core::option::Option<stream_send_bytes_callback::Result>,
}
/// Nested message and enum types in `StreamSendBytesCallback`.
pub mod stream_send_bytes_callback {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Result {
#[prost(message, tag="2")]
Info(super::ByteStreamInfo),
#[prost(message, tag="3")]
Error(super::StreamError),
}
}
// MARK: - Send text

/// Sends text over a data stream.
Expand Down Expand Up @@ -4889,7 +4928,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -5037,13 +5076,15 @@ pub mod ffi_request {
TextStreamWrite(super::TextStreamWriterWriteRequest),
#[prost(message, tag="66")]
TextStreamClose(super::TextStreamWriterCloseRequest),
#[prost(message, tag="67")]
SendBytes(super::StreamSendBytesRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -5189,6 +5230,8 @@ pub mod ffi_response {
TextStreamWrite(super::TextStreamWriterWriteResponse),
#[prost(message, tag="65")]
TextStreamClose(super::TextStreamWriterCloseResponse),
#[prost(message, tag="66")]
SendBytes(super::StreamSendBytesResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand All @@ -5197,7 +5240,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -5285,6 +5328,8 @@ pub mod ffi_event {
TextStreamWriterClose(super::TextStreamWriterCloseCallback),
#[prost(message, tag="40")]
SendText(super::StreamSendTextCallback),
#[prost(message, tag="41")]
SendBytes(super::StreamSendBytesCallback),
}
}
/// Stop all rooms synchronously (Do we need async here?).
Expand Down
20 changes: 20 additions & 0 deletions livekit-ffi/src/server/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ impl FfiParticipant {
Ok(proto::StreamSendTextResponse { async_id })
}

pub fn send_bytes(
&self,
server: &'static FfiServer,
request: proto::StreamSendBytesRequest,
) -> FfiResult<proto::StreamSendBytesResponse> {
let async_id = server.next_id();
let local = self.guard_local_participant()?;

let handle = server.async_runtime.spawn(async move {
let result = match local.send_bytes(&request.bytes, request.options.into()).await {
Ok(info) => proto::stream_send_bytes_callback::Result::Info(info.into()),
Err(err) => proto::stream_send_bytes_callback::Result::Error(err.into()),
};
let callback = proto::StreamSendBytesCallback { async_id, result: Some(result) };
let _ = server.send_event(proto::ffi_event::Message::SendBytes(callback));
});
server.watch_panic(handle);
Ok(proto::StreamSendBytesResponse { async_id })
}

pub fn stream_bytes(
&self,
server: &'static FfiServer,
Expand Down
Loading
Loading