diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index de72cb808..93fbaea9d 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1730,7 +1730,7 @@ async fn outgoing_data_stream_task( loop { tokio::select! { Ok((packet, responder)) = packet_rx.recv() => { - let result = engine.publish_data(packet, DataPacketKind::Reliable).await; + let result = engine.publish_data(packet, DataPacketKind::Reliable, false).await; let _ = responder.respond(result); }, _ = close_rx.recv() => { diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 982463e1d..d757f09f5 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -407,7 +407,7 @@ impl LocalParticipant { ..Default::default() }; - match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await { + match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await { Ok(_) => Ok(ChatMessage::from(chat_message)), Err(e) => Err(Into::into(e)), } @@ -433,7 +433,7 @@ impl LocalParticipant { ..Default::default() }; - match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await { + match self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable, false).await { Ok(_) => Ok(ChatMessage::from(proto_msg)), Err(e) => Err(Into::into(e)), } @@ -477,7 +477,7 @@ impl LocalParticipant { true => DataPacketKind::Reliable, false => DataPacketKind::Lossy, }; - self.inner.rtc_engine.publish_data(packet, kind).await.map_err(Into::into) + self.inner.rtc_engine.publish_data(packet, kind, true).await.map_err(Into::into) } pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> { @@ -498,7 +498,7 @@ impl LocalParticipant { ..Default::default() }; - self.inner.rtc_engine.publish_data(data, kind).await.map_err(Into::into) + self.inner.rtc_engine.publish_data(data, kind, false).await.map_err(Into::into) } pub fn set_data_channel_buffered_amount_low_threshold( @@ -553,7 +553,11 @@ impl LocalParticipant { value: Some(proto::data_packet::Value::Transcription(transcription_packet)), ..Default::default() }; - self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await.map_err(Into::into) + self.inner + .rtc_engine + .publish_data(data, DataPacketKind::Reliable, false) + .await + .map_err(Into::into) } pub async fn publish_dtmf(&self, dtmf: SipDTMF) -> RoomResult<()> { @@ -567,7 +571,11 @@ impl LocalParticipant { ..Default::default() }; - self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await.map_err(Into::into) + self.inner + .rtc_engine + .publish_data(data, DataPacketKind::Reliable, false) + .await + .map_err(Into::into) } async fn publish_rpc_request(&self, rpc_request: RpcRequest) -> RoomResult<()> { @@ -587,7 +595,11 @@ impl LocalParticipant { ..Default::default() }; - self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await.map_err(Into::into) + self.inner + .rtc_engine + .publish_data(data, DataPacketKind::Reliable, false) + .await + .map_err(Into::into) } async fn publish_rpc_response(&self, rpc_response: RpcResponse) -> RoomResult<()> { @@ -611,7 +623,11 @@ impl LocalParticipant { ..Default::default() }; - self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await.map_err(Into::into) + self.inner + .rtc_engine + .publish_data(data, DataPacketKind::Reliable, false) + .await + .map_err(Into::into) } async fn publish_rpc_ack(&self, rpc_ack: RpcAck) -> RoomResult<()> { @@ -625,7 +641,11 @@ impl LocalParticipant { ..Default::default() }; - self.inner.rtc_engine.publish_data(data, DataPacketKind::Reliable).await.map_err(Into::into) + self.inner + .rtc_engine + .publish_data(data, DataPacketKind::Reliable, false) + .await + .map_err(Into::into) } pub(crate) async fn update_track_subscription_permissions(&self) { diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 363bd6eb2..f3014e156 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -246,13 +246,13 @@ impl RtcEngine { &self, data: proto::DataPacket, kind: DataPacketKind, + is_raw_packet: bool, ) -> EngineResult<()> { let (session, _r_lock) = { let (handle, _r_lock) = self.inner.wait_reconnection().await?; (handle.session.clone(), _r_lock) }; - - session.publish_data(data, kind).await + session.publish_data(data, kind, is_raw_packet).await } pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> { diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 0bd6f10a7..6d95adaf9 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -213,7 +213,7 @@ enum DataChannelEventDetail { #[derive(Debug)] struct PublishPacketRequest { - /// Unencoded data packewt. + /// Unencoded data packet. packet: proto::DataPacket, /// Notifies the caller once the request has been fulfilled. @@ -488,8 +488,9 @@ impl RtcSession { &self, data: proto::DataPacket, kind: DataPacketKind, + is_raw_packet: bool, ) -> Result<(), EngineError> { - self.inner.publish_data(data, kind).await + self.inner.publish_data(data, kind, is_raw_packet).await } pub async fn restart(&self) -> EngineResult { @@ -1339,12 +1340,15 @@ impl SessionInner { self: &Arc, mut packet: proto::DataPacket, kind: DataPacketKind, + is_raw_packet: bool, ) -> Result<(), EngineError> { self.ensure_publisher_connected(kind).await?; // Populate local participant info fields - packet.participant_identity = self.participant_info.identity.to_string(); - packet.participant_sid = self.participant_info.sid.to_string(); + if !is_raw_packet { + packet.participant_identity = self.participant_info.identity.to_string(); + packet.participant_sid = self.participant_info.sid.to_string(); + } let (completion_tx, completion_rx) = oneshot::channel(); let ev = DataChannelEvent {