diff --git a/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt b/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt new file mode 100644 index 000000000..743939fa1 --- /dev/null +++ b/android/src/main/kotlin/io/livekit/plugin/AudioProcessors.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.plugin + +/** + * Container for managing audio processors (renderers and visualizers) for a specific audio track + * Similar to iOS AudioProcessors implementation + */ +class AudioProcessors( + val track: LKAudioTrack +) { + val renderers = mutableMapOf() + val visualizers = mutableMapOf() + + /** + * Clean up all processors and release resources + */ + fun cleanup() { + renderers.values.forEach { it.detach() } + renderers.clear() + + visualizers.values.forEach { it.stop() } + visualizers.clear() + } +} diff --git a/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt b/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt new file mode 100644 index 000000000..735bb6fef --- /dev/null +++ b/android/src/main/kotlin/io/livekit/plugin/AudioRenderer.kt @@ -0,0 +1,299 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.plugin + +import android.os.Handler +import android.os.Looper +import io.flutter.plugin.common.BinaryMessenger +import io.flutter.plugin.common.EventChannel +import org.webrtc.AudioTrackSink +import java.nio.ByteBuffer +import java.nio.ByteOrder + +/** + * AudioRenderer for capturing audio data from WebRTC tracks and streaming to Flutter + * Similar to iOS AudioRenderer implementation + */ +class AudioRenderer( + private val audioTrack: LKAudioTrack, + private val binaryMessenger: BinaryMessenger, + private val rendererId: String, + private val targetFormat: RendererAudioFormat +) : EventChannel.StreamHandler, AudioTrackSink { + + private var eventChannel: EventChannel? = null + private var eventSink: EventChannel.EventSink? = null + private var isAttached = false + + private val handler: Handler by lazy { + Handler(Looper.getMainLooper()) + } + + init { + val channelName = "io.livekit.audio.renderer/channel-$rendererId" + eventChannel = EventChannel(binaryMessenger, channelName) + eventChannel?.setStreamHandler(this) + + // Attach to the audio track + audioTrack.addSink(this) + isAttached = true + } + + fun detach() { + if (isAttached) { + audioTrack.removeSink(this) + isAttached = false + } + eventChannel?.setStreamHandler(null) + eventSink = null + } + + override fun onListen(arguments: Any?, events: EventChannel.EventSink?) { + eventSink = events + } + + override fun onCancel(arguments: Any?) { + eventSink = null + } + + override fun onData( + audioData: ByteBuffer, + bitsPerSample: Int, + sampleRate: Int, + numberOfChannels: Int, + numberOfFrames: Int, + absoluteCaptureTimestampMs: Long + ) { + eventSink?.let { sink -> + try { + // Convert audio data to the target format + val convertedData = convertAudioData( + audioData, + bitsPerSample, + sampleRate, + numberOfChannels, + numberOfFrames + ) + + // Send to Flutter on the main thread + handler.post { + sink.success(convertedData) + } + } catch (e: Exception) { + handler.post { + sink.error( + "AUDIO_CONVERSION_ERROR", + "Failed to convert audio data: ${e.message}", + null + ) + } + } + } + } + + private fun convertAudioData( + audioData: ByteBuffer, + bitsPerSample: Int, + sampleRate: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): Map { + // Create result similar to iOS implementation + val result = mutableMapOf( + "sampleRate" to sampleRate, + "channels" to numberOfChannels, + "frameLength" to numberOfFrames + ) + + // Convert based on target format + when (targetFormat.commonFormat) { + "int16" -> { + result["commonFormat"] = "int16" + result["data"] = + convertToInt16(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + + "float32" -> { + result["commonFormat"] = "float32" + result["data"] = + convertToFloat32(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + + else -> { + result["commonFormat"] = "int16" // Default fallback + result["data"] = + convertToInt16(audioData, bitsPerSample, numberOfChannels, numberOfFrames) + } + } + + return result + } + + private fun convertToInt16( + audioData: ByteBuffer, + bitsPerSample: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): List> { + val channelsData = mutableListOf>() + + // Prepare buffer for reading + val buffer = audioData.duplicate() + buffer.order(ByteOrder.LITTLE_ENDIAN) + buffer.rewind() + + when (bitsPerSample) { + 16 -> { + // Already 16-bit, just reformat by channels + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) // Start from beginning for each channel + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 2 + + if (byteIndex + 1 < buffer.capacity()) { + buffer.position(byteIndex) + val sample = buffer.short.toInt() + channelData.add(sample) + } + } + channelsData.add(channelData) + } + } + + 32 -> { + // Convert from 32-bit to 16-bit + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 4 + + if (byteIndex + 3 < buffer.capacity()) { + buffer.position(byteIndex) + val sample32 = buffer.int + // Convert 32-bit to 16-bit by right-shifting + val sample16 = (sample32 shr 16).toShort().toInt() + channelData.add(sample16) + } + } + channelsData.add(channelData) + } + } + + else -> { + // Unsupported format, return empty data + repeat(numberOfChannels) { + channelsData.add(emptyList()) + } + } + } + + return channelsData + } + + private fun convertToFloat32( + audioData: ByteBuffer, + bitsPerSample: Int, + numberOfChannels: Int, + numberOfFrames: Int + ): List> { + val channelsData = mutableListOf>() + + val buffer = audioData.duplicate() + buffer.order(ByteOrder.LITTLE_ENDIAN) + buffer.rewind() + + when (bitsPerSample) { + 16 -> { + // Convert from 16-bit to float32 + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 2 + + if (byteIndex + 1 < buffer.capacity()) { + buffer.position(byteIndex) + val sample16 = buffer.short + // Convert to float (-1.0 to 1.0) + val sampleFloat = sample16.toFloat() / Short.MAX_VALUE + channelData.add(sampleFloat) + } + } + channelsData.add(channelData) + } + } + + 32 -> { + // Assume 32-bit float input + for (channel in 0 until numberOfChannels) { + val channelData = mutableListOf() + buffer.position(0) + + for (frame in 0 until numberOfFrames) { + val sampleIndex = frame * numberOfChannels + channel + val byteIndex = sampleIndex * 4 + + if (byteIndex + 3 < buffer.capacity()) { + buffer.position(byteIndex) + val sampleFloat = buffer.float + channelData.add(sampleFloat) + } + } + channelsData.add(channelData) + } + } + + else -> { + // Unsupported format + repeat(numberOfChannels) { + channelsData.add(emptyList()) + } + } + } + + return channelsData + } +} + +/** + * Audio format specification for the renderer + */ +data class RendererAudioFormat( + val bitsPerSample: Int, + val sampleRate: Int, + val numberOfChannels: Int, + val commonFormat: String = "int16" +) { + companion object { + fun fromMap(formatMap: Map): RendererAudioFormat? { + val bitsPerSample = formatMap["bitsPerSample"] as? Int ?: 16 + val sampleRate = formatMap["sampleRate"] as? Int ?: 48000 + val numberOfChannels = formatMap["channels"] as? Int ?: 1 + val commonFormat = formatMap["commonFormat"] as? String ?: "int16" + + return RendererAudioFormat(bitsPerSample, sampleRate, numberOfChannels, commonFormat) + } + } +} diff --git a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt index 079988f83..88c73c658 100644 --- a/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt +++ b/android/src/main/kotlin/io/livekit/plugin/LiveKitPlugin.kt @@ -31,15 +31,16 @@ import io.flutter.plugin.common.BinaryMessenger import org.webrtc.AudioTrack /** LiveKitPlugin */ -class LiveKitPlugin: FlutterPlugin, MethodCallHandler { - private var processors = mutableMapOf() +class LiveKitPlugin : FlutterPlugin, MethodCallHandler { + private var audioProcessors = mutableMapOf() private var flutterWebRTCPlugin = FlutterWebRTCPlugin.sharedSingleton private var binaryMessenger: BinaryMessenger? = null + /// The MethodChannel that will the communication between Flutter and native Android /// /// This local reference serves to register the plugin with the Flutter Engine and unregister it /// when the Flutter Engine is detached from the Activity - private lateinit var channel : MethodChannel + private lateinit var channel: MethodChannel override fun onAttachedToEngine(@NonNull flutterPluginBinding: FlutterPlugin.FlutterPluginBinding) { channel = MethodChannel(flutterPluginBinding.binaryMessenger, "livekit_client") @@ -55,33 +56,33 @@ class LiveKitPlugin: FlutterPlugin, MethodCallHandler { result.error("INVALID_ARGUMENT", "trackId and visualizerId is required", null) return } - var audioTrack: LKAudioTrack? = null + val barCount = call.argument("barCount") ?: 7 val isCentered = call.argument("isCentered") ?: true var smoothTransition = call.argument("smoothTransition") ?: true - val track = flutterWebRTCPlugin.getLocalTrack(trackId) - if (track != null) { - audioTrack = LKLocalAudioTrack(track as LocalAudioTrack) - } else { - val remoteTrack = flutterWebRTCPlugin.getRemoteTrack(trackId) - if (remoteTrack != null) { - audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) - } + val processors = getAudioProcessors(trackId) + if (processors == null) { + result.error("INVALID_ARGUMENT", "track not found", null) + return } - if(audioTrack == null) { - result.error("INVALID_ARGUMENT", "track not found", null) + // Check if visualizer already exists + if (processors.visualizers[visualizerId] != null) { + result.success(null) return } val visualizer = Visualizer( - barCount = barCount, isCentered = isCentered, + barCount = barCount, + isCentered = isCentered, smoothTransition = smoothTransition, - audioTrack = audioTrack, binaryMessenger = binaryMessenger!!, - visualizerId = visualizerId) + audioTrack = processors.track, + binaryMessenger = binaryMessenger!!, + visualizerId = visualizerId + ) - processors[visualizerId] = visualizer + processors.visualizers[visualizerId] = visualizer result.success(null) } @@ -92,28 +93,152 @@ class LiveKitPlugin: FlutterPlugin, MethodCallHandler { result.error("INVALID_ARGUMENT", "trackId and visualizerId is required", null) return } - processors.forEach { (k, visualizer) -> - if(k == visualizerId) { + + // Find and remove visualizer from all processors + for (processors in audioProcessors.values) { + processors.visualizers[visualizerId]?.let { visualizer -> visualizer.stop() + processors.visualizers.remove(visualizerId) } } - processors.entries.removeAll { (k, v) -> k == visualizerId } + result.success(null) } - override fun onMethodCall(@NonNull call: MethodCall, @NonNull result: Result) { - if(call.method == "startVisualizer") { - handleStartVisualizer(call, result) + /** + * Get or create AudioProcessors for a given trackId + */ + private fun getAudioProcessors(trackId: String): AudioProcessors? { + // Return existing if found + audioProcessors[trackId]?.let { return it } + + // Create new AudioProcessors for this track + var audioTrack: LKAudioTrack? = null + + val localTrack = flutterWebRTCPlugin.getLocalTrack(trackId) + if (localTrack != null) { + audioTrack = LKLocalAudioTrack(localTrack as LocalAudioTrack) + } else { + val remoteTrack = flutterWebRTCPlugin.getRemoteTrack(trackId) + if (remoteTrack != null) { + audioTrack = LKRemoteAudioTrack(remoteTrack as AudioTrack) + } + } + + return audioTrack?.let { track -> + val processors = AudioProcessors(track) + audioProcessors[trackId] = processors + processors + } + } + + /** + * Handle startAudioRenderer method call + */ + private fun handleStartAudioRenderer(@NonNull call: MethodCall, @NonNull result: Result) { + val trackId = call.argument("trackId") + val rendererId = call.argument("rendererId") + val formatMap = call.argument>("format") + + if (trackId == null) { + result.error("INVALID_ARGUMENT", "trackId is required", null) + return + } + + if (rendererId == null) { + result.error("INVALID_ARGUMENT", "rendererId is required", null) + return + } + + if (formatMap == null) { + result.error("INVALID_ARGUMENT", "format is required", null) + return + } + + val format = RendererAudioFormat.fromMap(formatMap) + if (format == null) { + result.error("INVALID_ARGUMENT", "Failed to parse format", null) + return + } + + val processors = getAudioProcessors(trackId) + if (processors == null) { + result.error("INVALID_ARGUMENT", "No such track", null) return - } else if(call.method == "stopVisualizer") { - handleStopVisualizer(call, result) + } + + // Check if renderer already exists + if (processors.renderers[rendererId] != null) { + result.success(true) + return + } + + try { + val renderer = AudioRenderer( + processors.track, + binaryMessenger!!, + rendererId, + format, + ) + + processors.renderers[rendererId] = renderer + result.success(true) + } catch (e: Exception) { + result.error("RENDERER_ERROR", "Failed to create audio renderer: ${e.message}", null) + } + } + + /** + * Handle stopAudioRenderer method call + */ + private fun handleStopAudioRenderer(@NonNull call: MethodCall, @NonNull result: Result) { + val rendererId = call.argument("rendererId") + + if (rendererId == null) { + result.error("INVALID_ARGUMENT", "rendererId is required", null) return } - // no-op for now - result.notImplemented() + + // Find and remove renderer from all processors + for (processors in audioProcessors.values) { + processors.renderers[rendererId]?.let { renderer -> + renderer.detach() + processors.renderers.remove(rendererId) + } + } + + result.success(true) + } + + override fun onMethodCall(@NonNull call: MethodCall, @NonNull result: Result) { + when (call.method) { + "startVisualizer" -> { + handleStartVisualizer(call, result) + } + + "stopVisualizer" -> { + handleStopVisualizer(call, result) + } + + "startAudioRenderer" -> { + handleStartAudioRenderer(call, result) + } + + "stopAudioRenderer" -> { + handleStopAudioRenderer(call, result) + } + + else -> { + result.notImplemented() + } + } } override fun onDetachedFromEngine(@NonNull binding: FlutterPlugin.FlutterPluginBinding) { channel.setMethodCallHandler(null) + + // Cleanup all processors + audioProcessors.values.forEach { it.cleanup() } + audioProcessors.clear() } } diff --git a/ios/Classes/AudioConverter.swift b/ios/Classes/AudioConverter.swift new file mode 120000 index 000000000..cc657a8ec --- /dev/null +++ b/ios/Classes/AudioConverter.swift @@ -0,0 +1 @@ +../../shared_swift/AudioConverter.swift \ No newline at end of file diff --git a/ios/Classes/AudioRenderer.swift b/ios/Classes/AudioRenderer.swift new file mode 120000 index 000000000..a1bce0cda --- /dev/null +++ b/ios/Classes/AudioRenderer.swift @@ -0,0 +1 @@ +../../shared_swift/AudioRenderer.swift \ No newline at end of file diff --git a/lib/livekit_client.dart b/lib/livekit_client.dart index 026b1c39c..6751907a3 100644 --- a/lib/livekit_client.dart +++ b/lib/livekit_client.dart @@ -14,6 +14,7 @@ export 'src/constants.dart'; export 'src/core/room.dart'; +export 'src/core/room_preconnect.dart'; export 'src/data_stream/stream_reader.dart'; export 'src/data_stream/stream_writer.dart'; export 'src/e2ee/e2ee_manager.dart'; @@ -31,6 +32,7 @@ export 'src/options.dart'; export 'src/participant/local.dart'; export 'src/participant/participant.dart'; export 'src/participant/remote.dart'; +export 'src/preconnect/pre_connect_audio_buffer.dart'; export 'src/publication/local.dart'; export 'src/publication/remote.dart'; export 'src/publication/track_publication.dart'; diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 66ec89068..a458acf59 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -36,6 +36,7 @@ import '../options.dart'; import '../participant/local.dart'; import '../participant/participant.dart'; import '../participant/remote.dart'; +import '../preconnect/pre_connect_audio_buffer.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import '../support/disposable.dart'; @@ -131,6 +132,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final Map _textStreamHandlers = {}; + @internal + late final PreConnectAudioBuffer preConnectAudioBuffer; + // for testing @internal Map get rpcHandlers => _rpcHandlers; @@ -167,9 +171,13 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _setupDataStreamListeners(); + preConnectAudioBuffer = PreConnectAudioBuffer(this); + onDispose(() async { // clean up routine await _cleanUp(); + // dispose preConnectAudioBuffer + await preConnectAudioBuffer.dispose(); // dispose events await events.dispose(); // dispose local participant @@ -421,6 +429,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable { await _localParticipant!.updateFromInfo(event.response.participant); } + // Check if preconnect buffer is recording and publish its track + if (preConnectAudioBuffer.isRecording && preConnectAudioBuffer.localTrack != null) { + logger.info('Publishing preconnect audio track'); + await _localParticipant!.publishAudioTrack( + preConnectAudioBuffer.localTrack!, + publishOptions: roomOptions.defaultAudioPublishOptions.copyWith(preConnect: true), + ); + } + if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index && engine.fastConnectOptions != null && !engine.fullReconnectOnNext) { @@ -428,7 +445,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final audio = options.microphone; final bool audioEnabled = audio.enabled == true || audio.track != null; - if (audioEnabled) { + + // Only enable microphone if preconnect buffer is not active + if (audioEnabled && !preConnectAudioBuffer.isRecording) { if (audio.track != null) { await _localParticipant!.publishAudioTrack(audio.track as LocalAudioTrack, publishOptions: roomOptions.defaultAudioPublishOptions); @@ -925,6 +944,8 @@ extension RoomPrivateMethods on Room { _activeSpeakers.clear(); + await preConnectAudioBuffer.reset(); + // clean up engine await engine.cleanUp(); diff --git a/lib/src/core/room_preconnect.dart b/lib/src/core/room_preconnect.dart new file mode 100644 index 000000000..8876d530a --- /dev/null +++ b/lib/src/core/room_preconnect.dart @@ -0,0 +1,42 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import '../logger.dart'; +import '../preconnect/pre_connect_audio_buffer.dart'; +import 'room.dart'; + +extension RoomPreConnect on Room { + /// Wrap an async operation while a pre-connect audio buffer records. + /// Stops and flushes on error. + Future withPreConnectAudio( + Future Function() operation, { + Duration timeout = const Duration(seconds: 10), + PreConnectOnError? onError, + }) async { + await preConnectAudioBuffer.startRecording(timeout: timeout); + try { + final result = await operation(); + await preConnectAudioBuffer.agentReadyFuture; + return result; + } catch (error) { + await preConnectAudioBuffer.stopRecording(withError: error); + logger.warning('[Preconnect] operation failed with error: $error'); + rethrow; + } finally { + await preConnectAudioBuffer.reset(); + } + } +} diff --git a/lib/src/events.dart b/lib/src/events.dart index 31cbbaf09..dc251603e 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -382,7 +382,7 @@ class ParticipantStateUpdatedEvent with RoomEvent, ParticipantEvent { }); @override - String toString() => '${runtimeType}(participant: ${participant})'; + String toString() => '${runtimeType}(participant: ${participant}, state: ${state})'; } /// [Pariticpant]'s [ConnectionQuality] has updated. @@ -588,3 +588,33 @@ class TrackProcessorUpdateEvent with TrackEvent { String toString() => '${runtimeType}' 'track: ${track})'; } + +/// Pre-connect audio buffer has started recording. +/// Emitted by [Room]. +class PreConnectAudioBufferStartedEvent with RoomEvent { + final int sampleRate; + final Duration timeout; + const PreConnectAudioBufferStartedEvent({ + required this.sampleRate, + required this.timeout, + }); + + @override + String toString() => '${runtimeType}' + '(sampleRate: ${sampleRate}, timeout: ${timeout})'; +} + +/// Pre-connect audio buffer has stopped recording. +/// Emitted by [Room]. +class PreConnectAudioBufferStoppedEvent with RoomEvent { + final int bufferedSize; + final bool isBufferSent; + const PreConnectAudioBufferStoppedEvent({ + required this.bufferedSize, + required this.isBufferSent, + }); + + @override + String toString() => '${runtimeType}' + '(bufferedSize: ${bufferedSize}, isDataSent: ${isBufferSent})'; +} diff --git a/lib/src/options.dart b/lib/src/options.dart index 33bcc9051..af562af0c 100644 --- a/lib/src/options.dart +++ b/lib/src/options.dart @@ -324,12 +324,17 @@ class AudioPublishOptions extends PublishOptions { /// max audio bitrate final int audioBitrate; + /// Mark this audio as originating from a pre-connect buffer. + /// Used to populate protobuf audioFeatures (TF_PRECONNECT_BUFFER). + final bool preConnect; + const AudioPublishOptions({ super.name, super.stream, this.dtx = true, this.red = true, this.audioBitrate = AudioPreset.music, + this.preConnect = false, }); AudioPublishOptions copyWith({ @@ -338,6 +343,7 @@ class AudioPublishOptions extends PublishOptions { String? name, String? stream, bool? red, + bool? preConnect, }) => AudioPublishOptions( dtx: dtx ?? this.dtx, @@ -345,10 +351,12 @@ class AudioPublishOptions extends PublishOptions { name: name ?? this.name, stream: stream ?? this.stream, red: red ?? this.red, + preConnect: preConnect ?? this.preConnect, ); @override - String toString() => '${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red})'; + String toString() => + '${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red}, preConnect: ${preConnect})'; } final backupCodecs = ['vp8', 'h264']; diff --git a/lib/src/participant/local.dart b/lib/src/participant/local.dart index 6de97347b..e7d52e47e 100644 --- a/lib/src/participant/local.dart +++ b/lib/src/participant/local.dart @@ -123,6 +123,12 @@ class LocalParticipant extends Participant { encryption: room.roomOptions.lkEncryptionType, ); + // Populate audio features (e.g., TF_NO_DTX, TF_PRECONNECT_BUFFER) + req.audioFeatures.addAll([ + if (!publishOptions.dtx) lk_models.AudioTrackFeature.TF_NO_DTX, + if (publishOptions.preConnect) lk_models.AudioTrackFeature.TF_PRECONNECT_BUFFER, + ]); + Future negotiate() async { track.transceiver = await room.engine.createTransceiverRTCRtpSender(track, publishOptions!, encodings); await room.engine.negotiate(); diff --git a/lib/src/preconnect/pre_connect_audio_buffer.dart b/lib/src/preconnect/pre_connect_audio_buffer.dart new file mode 100644 index 000000000..f5925da7a --- /dev/null +++ b/lib/src/preconnect/pre_connect_audio_buffer.dart @@ -0,0 +1,276 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:flutter/services.dart'; + +import 'package:flutter_webrtc/flutter_webrtc.dart' as webrtc; +import 'package:uuid/uuid.dart'; + +import '../core/room.dart'; +import '../events.dart'; +import '../logger.dart'; +import '../participant/local.dart'; +import '../support/completer_manager.dart'; +import '../support/native.dart'; +import '../track/local/audio.dart'; +import '../types/data_stream.dart'; +import '../types/other.dart'; +import '../types/participant_state.dart'; + +typedef PreConnectOnError = void Function(Object error); + +class PreConnectAudioBuffer { + static const String dataTopic = 'lk.agent.pre-connect-audio-buffer'; + + static const int defaultMaxSize = 10 * 1024 * 1024; // 10MB + static const int defaultSampleRate = 24000; // Hz + static const int defaultChunkSize = 64 * 1024; // 64KB chunks for streaming + + // Reference to the room + final Room _room; + + // Internal states + bool _isRecording = false; + bool _isBufferSent = false; + String? _rendererId; + + LocalAudioTrack? _localTrack; + EventChannel? _eventChannel; + StreamSubscription? _streamSubscription; + + final PreConnectOnError? _onError; + final int _requestSampleRate; + int? _renderedSampleRate; + + final BytesBuilder _buffer = BytesBuilder(copy: false); + Timer? _timeoutTimer; + CancelListenFunc? _participantStateListener; + + final CompleterManager _agentReadyManager = CompleterManager(); + + PreConnectAudioBuffer( + this._room, { + PreConnectOnError? onError, + int sampleRate = defaultSampleRate, + }) : _onError = onError, + _requestSampleRate = sampleRate; + + // Getters + bool get isRecording => _isRecording; + int get bufferedSize => _buffer.length; + LocalAudioTrack? get localTrack => _localTrack; + + Future? _localTrackPublishedEvent; + + /// Future that completes when an agent is ready. + Future get agentReadyFuture => _agentReadyManager.future; + + Future startRecording({ + Duration timeout = const Duration(seconds: 20), + }) async { + if (_isRecording) { + logger.warning('Already recording'); + return; + } + _isRecording = true; + + // Set up timeout for agent readiness + _agentReadyManager.setTimer(timeout, timeoutReason: 'Agent did not become ready within timeout'); + + _localTrack = await LocalAudioTrack.create(); + print('localTrack: ${_localTrack!.mediaStreamTrack.id}'); + + final rendererId = Uuid().v4(); + logger.info('Starting audio renderer with rendererId: $rendererId'); + + final result = await Native.startAudioRenderer( + trackId: _localTrack!.mediaStreamTrack.id!, + rendererId: rendererId, + format: { + 'commonFormat': 'int16', + 'sampleRate': _requestSampleRate, + 'channels': 1, + }, + ); + + await webrtc.NativeAudioManagement.startLocalRecording(); + + _rendererId = rendererId; + + logger.info('startAudioRenderer result: $result'); + + _eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId'); + _streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) { + try { + // Actual sample rate of the audio data, can differ from the request sample rate + _renderedSampleRate = event['sampleRate'] as int; + final dataChannels = event['data'] as List; + final monoData = dataChannels[0].cast(); + // Convert Int16 values to bytes using typed data view + final int16List = Int16List.fromList(monoData); + final bytes = int16List.buffer.asUint8List(); + _buffer.add(bytes); + } catch (e) { + logger.warning('[Preconnect audio] Error parsing event: $e'); + } + }); + + // Listen for agent readiness; when active, attempt to send buffer once. + _participantStateListener = _room.events.on( + filter: (event) => event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active, + (event) async { + logger.info('[Preconnect audio] Agent is active: ${event.participant.identity}'); + try { + await sendAudioData(agents: [event.participant.identity]); + _agentReadyManager.complete(); + } catch (error) { + _agentReadyManager.completeError(error); + _onError?.call(error); + } + }); + + _localTrackPublishedEvent = _room.events.waitFor( + duration: Duration(seconds: 10), + filter: (event) => event.participant == _room.localParticipant, + ); + + // Emit the started event + _room.events.emit(PreConnectAudioBufferStartedEvent( + sampleRate: _requestSampleRate, + timeout: timeout, + )); + } + + Future stopRecording({Object? withError}) async { + if (!_isRecording) return; + _isRecording = false; + + // Cancel the stream subscription. + await _streamSubscription?.cancel(); + _streamSubscription = null; + + // Dispose the event channel. + _eventChannel = null; + + final rendererId = _rendererId; + if (rendererId != null) { + await Native.stopAudioRenderer( + rendererId: rendererId, + ); + } + + _rendererId = null; + + // Stop native audio when errored + if (withError != null) { + await webrtc.NativeAudioManagement.stopLocalRecording(); + } + + // Complete agent ready future if not already completed + _agentReadyManager.complete(); + + // Emit the stopped event + _room.events.emit(PreConnectAudioBufferStoppedEvent( + bufferedSize: _buffer.length, + isBufferSent: _isBufferSent, + )); + + logger.info('[Preconnect audio] stopped recording'); + } + + // Clean-up & reset for re-use + Future reset() async { + await stopRecording(); + _timeoutTimer?.cancel(); + _participantStateListener?.call(); + _participantStateListener = null; + _buffer.clear(); + + // Don't stop the local track - it will continue to be used by the Room + _localTrack = null; + + _agentReadyManager.reset(); + _localTrackPublishedEvent = null; + + // Reset the _isSent flag to allow data sending on next use + _isBufferSent = false; + + logger.info('[Preconnect audio] reset'); + } + + // Dispose the audio buffer and clean up all resources. + Future dispose() async { + await reset(); + logger.info('[Preconnect audio] disposed'); + } + + Future sendAudioData({ + required List agents, + String topic = dataTopic, + }) async { + if (_isBufferSent) return; + if (agents.isEmpty) return; + + final sampleRate = _renderedSampleRate; + if (sampleRate == null) { + logger.severe('[Preconnect audio] renderedSampleRate is null'); + return; + } + + // Wait for local track published event + final localTrackPublishedEvent = await _localTrackPublishedEvent; + logger.info('[Preconnect audio] localTrackPublishedEvent: $localTrackPublishedEvent'); + + final localTrackSid = localTrackPublishedEvent?.publication.track?.sid; + if (localTrackSid == null) { + logger.severe('[Preconnect audio] localTrackPublishedEvent is null'); + return; + } + + logger.info('[Preconnect audio] sending audio data to ${agents.map((e) => e).join(', ')} agent(s)'); + + final data = _buffer.takeBytes(); + logger.info('[Preconnect audio] data.length: ${data.length}, bytes.length: ${_buffer.length}'); + + _isBufferSent = true; + + final streamOptions = StreamBytesOptions( + topic: topic, + attributes: { + 'sampleRate': sampleRate.toString(), + 'channels': '1', + 'trackId': localTrackSid, + }, + totalSize: data.length, + destinationIdentities: agents, + ); + + logger.info('[Preconnect audio] streamOptions: $streamOptions'); + + final writer = await _room.localParticipant!.streamBytes(streamOptions); + await writer.write(data); + await writer.close(); + + // Compute seconds of audio data sent + final int bytesPerSample = 2; // Assuming 16-bit audio + final int totalSamples = data.length ~/ bytesPerSample; + final double secondsOfAudio = totalSamples / sampleRate; + + logger.info( + '[Preconnect audio] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio (${secondsOfAudio.toStringAsFixed(2)} seconds) to ${agents} agent(s)'); + } +} diff --git a/lib/src/support/completer_manager.dart b/lib/src/support/completer_manager.dart new file mode 100644 index 000000000..8900f177e --- /dev/null +++ b/lib/src/support/completer_manager.dart @@ -0,0 +1,112 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +/// A manager for Completer instances that provides safe completion and automatic lifecycle management. +/// +/// Features: +/// - Safe completion (prevents double completion exceptions) +/// - Automatic timeout handling +/// - Clean state management and reusability +/// - Only exposes Future, not the Completer itself +/// - Thread-safe operations +class CompleterManager { + Completer? _completer; + Timer? _timeoutTimer; + bool _isCompleted = false; + + /// Gets the current future. Creates a new completer if none exists or previous one was completed. + Future get future { + if (_completer == null || _isCompleted) { + _reset(); + } + return _completer!.future; + } + + /// Whether the current completer is completed. + bool get isCompleted => _isCompleted; + + /// Whether there's an active completer waiting for completion. + bool get isActive => _completer != null && !_isCompleted; + + /// Completes the current completer with the given value. + /// Returns true if successfully completed, false if already completed. + bool complete([FutureOr? value]) { + if (_completer == null || _isCompleted) { + return false; + } + + _isCompleted = true; + _timeoutTimer?.cancel(); + _timeoutTimer = null; + + _completer!.complete(value); + return true; + } + + /// Completes the current completer with an error. + /// Returns true if successfully completed with error, false if already completed. + bool completeError(Object error, [StackTrace? stackTrace]) { + if (_completer == null || _isCompleted) { + return false; + } + + _isCompleted = true; + _timeoutTimer?.cancel(); + _timeoutTimer = null; + + _completer!.completeError(error, stackTrace); + return true; + } + + /// Sets up a timeout for the current completer. + /// If the completer is not completed within the timeout, it will be completed with a TimeoutException. + void setTimer(Duration timeout, {String? timeoutReason}) { + if (_completer == null || _isCompleted) { + return; + } + + _timeoutTimer?.cancel(); + _timeoutTimer = Timer(timeout, () { + if (!_isCompleted) { + final reason = timeoutReason ?? 'Operation timed out after $timeout'; + completeError(TimeoutException(reason, timeout)); + } + }); + } + + /// Resets the manager, canceling any pending operations and preparing for reuse. + void reset() { + _reset(); + } + + void _reset() { + _timeoutTimer?.cancel(); + _timeoutTimer = null; + _isCompleted = false; + _completer = Completer(); + } + + /// Disposes the manager, canceling any pending operations. + void dispose() { + _timeoutTimer?.cancel(); + _timeoutTimer = null; + if (_completer != null && !_isCompleted) { + _completer!.completeError(StateError('CompleterManager disposed')); + } + _completer = null; + _isCompleted = true; + } +} diff --git a/lib/src/support/native.dart b/lib/src/support/native.dart index dd5b9510a..753faef12 100644 --- a/lib/src/support/native.dart +++ b/lib/src/support/native.dart @@ -89,6 +89,44 @@ class Native { } } + @internal + static Future startAudioRenderer({ + required String trackId, + required String rendererId, + required Map format, + }) async { + try { + final result = await channel.invokeMethod( + 'startAudioRenderer', + { + 'trackId': trackId, + 'rendererId': rendererId, + 'format': format, + }, + ); + return result == true; + } catch (error) { + logger.warning('startAudioRenderer did throw $error'); + return false; + } + } + + @internal + static Future stopAudioRenderer({ + required String rendererId, + }) async { + try { + await channel.invokeMethod( + 'stopAudioRenderer', + { + 'rendererId': rendererId, + }, + ); + } catch (error) { + logger.warning('stopAudioRenderer did throw $error'); + } + } + /// Returns OS's version as a string /// Currently only for iOS, macOS @internal diff --git a/lib/src/types/data_stream.dart b/lib/src/types/data_stream.dart index c64b250ce..a5963ff54 100644 --- a/lib/src/types/data_stream.dart +++ b/lib/src/types/data_stream.dart @@ -76,6 +76,12 @@ class StreamTextOptions { this.generated = false, this.attributes = const {}, }); + + @override + String toString() => '${runtimeType}' + '(topic: $topic, destinationIdentities: $destinationIdentities, ' + 'streamId: $streamId, totalSize: $totalSize, type: $type, version: $version, ' + 'replyToStreamId: $replyToStreamId, attachedStreamIds: $attachedStreamIds)'; } class StreamBytesOptions { @@ -87,6 +93,7 @@ class StreamBytesOptions { String? streamId; int? totalSize; Encryption_Type? encryptionType; + StreamBytesOptions({ this.name, this.mimeType, @@ -97,6 +104,11 @@ class StreamBytesOptions { this.totalSize, this.encryptionType = Encryption_Type.NONE, }); + + @override + String toString() => '${runtimeType}' + '(name: $name, mimeType: $mimeType, topic: $topic, destinationIdentities: $destinationIdentities, ' + 'attributes: $attributes, streamId: $streamId, totalSize: $totalSize, encryptionType: $encryptionType)'; } class ChatMessage { @@ -176,6 +188,11 @@ class ByteStreamInfo extends BaseStreamInfo { sendingParticipantIdentity: sendingParticipantIdentity, encryptionType: encryptionType, ); + + @override + String toString() => '${runtimeType}' + '(name: $name, id: $id, mimeType: $mimeType, topic: $topic, ' + 'timestamp: $timestamp, size: $size, attributes: $attributes)'; } /// Operation types for text streams @@ -255,6 +272,11 @@ class TextStreamInfo extends BaseStreamInfo { encryptionType: encryptionType, sendingParticipantIdentity: sendingParticipantIdentity, ); + + @override + String toString() => '${runtimeType}' + '(id: $id, mimeType: $mimeType, topic: $topic, ' + 'timestamp: $timestamp, size: $size, attributes: $attributes)'; } abstract class StreamWriter { diff --git a/macos/Classes/AudioConverter.swift b/macos/Classes/AudioConverter.swift new file mode 120000 index 000000000..cc657a8ec --- /dev/null +++ b/macos/Classes/AudioConverter.swift @@ -0,0 +1 @@ +../../shared_swift/AudioConverter.swift \ No newline at end of file diff --git a/macos/Classes/AudioRenderer.swift b/macos/Classes/AudioRenderer.swift new file mode 120000 index 000000000..a1bce0cda --- /dev/null +++ b/macos/Classes/AudioRenderer.swift @@ -0,0 +1 @@ +../../shared_swift/AudioRenderer.swift \ No newline at end of file diff --git a/pubspec.lock b/pubspec.lock index 8567da2a1..ba94b337b 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -5,18 +5,18 @@ packages: dependency: transitive description: name: _fe_analyzer_shared - sha256: f0bb5d1648339c8308cc0b9838d8456b3cfe5c91f9dc1a735b4d003269e5da9a + sha256: dd3d2ad434b9510001d089e8de7556d50c834481b9abc2891a0184a8493a19dc url: "https://pub.dev" source: hosted - version: "88.0.0" + version: "89.0.0" analyzer: dependency: transitive description: name: analyzer - sha256: "0b7b9c329d2879f8f05d6c05b32ee9ec025f39b077864bdb5ac9a7b63418a98f" + sha256: c22b6e7726d1f9e5db58c7251606076a71ca0dbcf76116675edfadbec0c9e875 url: "https://pub.dev" source: hosted - version: "8.1.1" + version: "8.2.0" args: dependency: transitive description: @@ -654,5 +654,5 @@ packages: source: hosted version: "3.1.3" sdks: - dart: ">=3.8.0 <4.0.0" + dart: ">=3.9.0 <4.0.0" flutter: ">=3.29.0" diff --git a/shared_swift/AVAudioPCMBuffer.swift b/shared_swift/AVAudioPCMBuffer.swift index ce4ccab49..9694cae25 100644 --- a/shared_swift/AVAudioPCMBuffer.swift +++ b/shared_swift/AVAudioPCMBuffer.swift @@ -18,6 +18,28 @@ import Accelerate import AVFoundation public extension AVAudioPCMBuffer { + /// Copies a range of an AVAudioPCMBuffer. + func copySegment(from startFrame: AVAudioFramePosition, to endFrame: AVAudioFramePosition) -> AVAudioPCMBuffer { + let framesToCopy = AVAudioFrameCount(endFrame - startFrame) + let segment = AVAudioPCMBuffer(pcmFormat: format, frameCapacity: framesToCopy)! + + let sampleSize = format.streamDescription.pointee.mBytesPerFrame + + let srcPtr = UnsafeMutableAudioBufferListPointer(mutableAudioBufferList) + let dstPtr = UnsafeMutableAudioBufferListPointer(segment.mutableAudioBufferList) + for (src, dst) in zip(srcPtr, dstPtr) { + memcpy(dst.mData, src.mData?.advanced(by: Int(startFrame) * Int(sampleSize)), Int(framesToCopy) * Int(sampleSize)) + } + + segment.frameLength = framesToCopy + return segment + } + + /// Copies a full segment from 0 to frameLength. frameCapacity will be equal to frameLength. + func copySegment() -> AVAudioPCMBuffer { + copySegment(from: 0, to: AVAudioFramePosition(frameLength)) + } + func resample(toSampleRate targetSampleRate: Double) -> AVAudioPCMBuffer? { let sourceFormat = format diff --git a/shared_swift/AudioConverter.swift b/shared_swift/AudioConverter.swift new file mode 100644 index 000000000..f4f470922 --- /dev/null +++ b/shared_swift/AudioConverter.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@preconcurrency import AVFAudio + +final class AudioConverter: Sendable { + let inputFormat: AVAudioFormat + let outputFormat: AVAudioFormat + + private let converter: AVAudioConverter + private let outputBuffer: AVAudioPCMBuffer + + /// Computes required frame capacity for output buffer. + static func frameCapacity(from inputFormat: AVAudioFormat, to outputFormat: AVAudioFormat, inputFrameCount: AVAudioFrameCount) -> AVAudioFrameCount { + let inputSampleRate = inputFormat.sampleRate + let outputSampleRate = outputFormat.sampleRate + // Compute the output frame capacity based on sample rate ratio + return AVAudioFrameCount(Double(inputFrameCount) * (outputSampleRate / inputSampleRate)) + } + + init?(from inputFormat: AVAudioFormat, to outputFormat: AVAudioFormat, outputBufferCapacity: AVAudioFrameCount = 9600) { + guard let converter = AVAudioConverter(from: inputFormat, to: outputFormat), + let buffer = AVAudioPCMBuffer(pcmFormat: outputFormat, frameCapacity: outputBufferCapacity) + else { + return nil + } + + outputBuffer = buffer + self.converter = converter + self.inputFormat = inputFormat + self.outputFormat = outputFormat + } + + func convert(from inputBuffer: AVAudioPCMBuffer) -> AVAudioPCMBuffer { + var error: NSError? + #if swift(>=6.0) + // Won't be accessed concurrently, marking as nonisolated(unsafe) to avoid Atomics. + nonisolated(unsafe) var bufferFilled = false + #else + var bufferFilled = false + #endif + + converter.convert(to: outputBuffer, error: &error) { _, outStatus in + if bufferFilled { + outStatus.pointee = .noDataNow + return nil + } + outStatus.pointee = .haveData + bufferFilled = true + return inputBuffer + } + + return outputBuffer.copySegment() + } +} diff --git a/shared_swift/AudioRenderer.swift b/shared_swift/AudioRenderer.swift new file mode 100644 index 000000000..5afc3afaf --- /dev/null +++ b/shared_swift/AudioRenderer.swift @@ -0,0 +1,169 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import AVFoundation +import WebRTC + +#if os(macOS) + import Cocoa + import FlutterMacOS +#else + import Flutter + import UIKit +#endif + +public class AudioRenderer: NSObject { + public let rendererId: String + public let format: AVAudioFormat // Target format + + private var eventSink: FlutterEventSink? + private var channel: FlutterEventChannel? + + private var converter: AudioConverter? + + // Weak ref + private weak var _track: AudioTrack? + + public init(track: AudioTrack, + binaryMessenger: FlutterBinaryMessenger, + rendererId: String, + format: AVAudioFormat) + { + _track = track + self.rendererId = rendererId + self.format = format + + super.init() + _track?.add(audioRenderer: self) + + let channelName = "io.livekit.audio.renderer/channel-" + rendererId + channel = FlutterEventChannel(name: channelName, binaryMessenger: binaryMessenger) + channel?.setStreamHandler(self) + } + + func detach() { + _track?.remove(audioRenderer: self) + } + + deinit { + detach() + } +} + +extension AudioRenderer: FlutterStreamHandler { + public func onListen(withArguments _: Any?, eventSink events: @escaping FlutterEventSink) -> FlutterError? { + eventSink = events + return nil + } + + public func onCancel(withArguments _: Any?) -> FlutterError? { + eventSink = nil + return nil + } +} + +public extension AVAudioPCMBuffer { + func serialize() -> [String: Any] { + // The format of the data: + // { + // "sampleRate": 48000.0, + // "channelCount": 2, + // "frameLength": 480, + // "format": "float32", // or "int16", "int32", "unknown" + // "data": [ + // [/* channel 0 audio samples */], + // [/* channel 1 audio samples */] + // ] + // } + + // Create the result dictionary to send to Flutter + var result: [String: Any] = [ + "sampleRate": UInt(format.sampleRate), + "channels": UInt(format.channelCount), + "frameLength": UInt(frameLength), + ] + + // Extract audio data based on the buffer format + if let floatChannelData = floatChannelData { + // Buffer contains float data + var channelsData: [[Float]] = [] + + for channel in 0 ..< Int(format.channelCount) { + let channelPointer = floatChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["commonFormat"] = "float32" + } else if let int16ChannelData = int16ChannelData { + // Buffer contains int16 data + var channelsData: [[Int16]] = [] + + for channel in 0 ..< Int(format.channelCount) { + let channelPointer = int16ChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["commonFormat"] = "int16" + } else if let int32ChannelData = int32ChannelData { + // Buffer contains int32 data + var channelsData: [[Int32]] = [] + + for channel in 0 ..< Int(format.channelCount) { + let channelPointer = int32ChannelData[channel] + let channelArray = Array(UnsafeBufferPointer(start: channelPointer, count: Int(frameLength))) + channelsData.append(channelArray) + } + + result["data"] = channelsData + result["commonFormat"] = "int32" + } else { + // Fallback - send minimal info if no recognizable data format + result["data"] = [] + result["commonFormat"] = "unknown" + } + + return result + } +} + +extension AudioRenderer: RTCAudioRenderer { + public func render(pcmBuffer: AVAudioPCMBuffer) { + guard let eventSink = eventSink else { return } + + // Create or update converter if needed + if converter == nil || pcmBuffer.format != converter!.inputFormat || format != converter!.outputFormat { + converter = AudioConverter(from: pcmBuffer.format, to: format) + } + + let convertedBuffer = converter!.convert(from: pcmBuffer) + + guard convertedBuffer.frameLength == UInt32(format.sampleRate / 100) else { + print("Converted buffer frame length does not match target format sample rate: \(convertedBuffer.frameLength) != \(format.sampleRate / 100) skipping this frame...") + return + } + + let serializedBuffer = convertedBuffer.serialize() + + // Send the result to Flutter on the main thread + DispatchQueue.main.async { + eventSink(serializedBuffer) + } + } +} diff --git a/shared_swift/LiveKitPlugin.swift b/shared_swift/LiveKitPlugin.swift index aed2f4478..c73c293ef 100644 --- a/shared_swift/LiveKitPlugin.swift +++ b/shared_swift/LiveKitPlugin.swift @@ -12,36 +12,53 @@ // See the License for the specific language governing permissions and // limitations under the License. -import WebRTC import flutter_webrtc +import WebRTC #if os(macOS) -import Cocoa -import FlutterMacOS + import Cocoa + import FlutterMacOS #else -import Flutter -import UIKit -import Combine + import Combine + import Flutter + import UIKit #endif +let trackIdKey = "trackId" +let visualizerIdKey = "visualizerId" +let rendererIdKey = "rendererId" +let formatKey = "format" + +let commonFormatKey = "commonFormat" +let sampleRateKey = "sampleRate" +let channelsKey = "channels" + +class AudioProcessors { + var track: AudioTrack + var visualizers: [String: Visualizer] = [:] + var renderers: [String: AudioRenderer] = [:] + + init(track: AudioTrack) { + self.track = track + } +} + @available(iOS 13.0, *) public class LiveKitPlugin: NSObject, FlutterPlugin { - - var processors: Dictionary = [:] - var tracks: Dictionary = [:] + // TrackId: AudioProcessors + var audioProcessors: [String: AudioProcessors] = [:] var binaryMessenger: FlutterBinaryMessenger? #if os(iOS) - var cancellable = Set() + var cancellable = Set() #endif public static func register(with registrar: FlutterPluginRegistrar) { - #if os(macOS) - let messenger = registrar.messenger + let messenger = registrar.messenger #else - let messenger = registrar.messenger() + let messenger = registrar.messenger() #endif let channel = FlutterMethodChannel(name: "livekit_client", binaryMessenger: messenger) @@ -50,198 +67,303 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { registrar.addMethodCallDelegate(instance, channel: channel) #if os(iOS) - BroadcastManager.shared.isBroadcastingPublisher - .sink { isBroadcasting in - channel.invokeMethod("broadcastStateChanged", arguments: isBroadcasting) - } - .store(in: &instance.cancellable) + BroadcastManager.shared.isBroadcastingPublisher + .sink { isBroadcasting in + channel.invokeMethod("broadcastStateChanged", arguments: isBroadcasting) + } + .store(in: &instance.cancellable) #endif } #if !os(macOS) - // https://developer.apple.com/documentation/avfaudio/avaudiosession/category - let categoryMap: [String: AVAudioSession.Category] = [ - "ambient": .ambient, - "multiRoute": .multiRoute, - "playAndRecord": .playAndRecord, - "playback": .playback, - "record": .record, - "soloAmbient": .soloAmbient - ] - - // https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions - let categoryOptionsMap: [String: AVAudioSession.CategoryOptions] = [ - "mixWithOthers": .mixWithOthers, - "duckOthers": .duckOthers, - "interruptSpokenAudioAndMixWithOthers": .interruptSpokenAudioAndMixWithOthers, - "allowBluetooth": .allowBluetooth, - "allowBluetoothA2DP": .allowBluetoothA2DP, - "allowAirPlay": .allowAirPlay, - "defaultToSpeaker": .defaultToSpeaker - // @available(iOS 14.5, *) - // "overrideMutedMicrophoneInterruption": .overrideMutedMicrophoneInterruption, - ] - - // https://developer.apple.com/documentation/avfaudio/avaudiosession/mode - let modeMap: [String: AVAudioSession.Mode] = [ - "default": .default, - "gameChat": .gameChat, - "measurement": .measurement, - "moviePlayback": .moviePlayback, - "spokenAudio": .spokenAudio, - "videoChat": .videoChat, - "videoRecording": .videoRecording, - "voiceChat": .voiceChat, - "voicePrompt": .voicePrompt - ] - - private func categoryOptions(fromFlutter options: [String]) -> AVAudioSession.CategoryOptions { - var result: AVAudioSession.CategoryOptions = [] - for option in categoryOptionsMap { - if options.contains(option.key) { - result.insert(option.value) + // https://developer.apple.com/documentation/avfaudio/avaudiosession/category + let categoryMap: [String: AVAudioSession.Category] = [ + "ambient": .ambient, + "multiRoute": .multiRoute, + "playAndRecord": .playAndRecord, + "playback": .playback, + "record": .record, + "soloAmbient": .soloAmbient, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions + let categoryOptionsMap: [String: AVAudioSession.CategoryOptions] = [ + "mixWithOthers": .mixWithOthers, + "duckOthers": .duckOthers, + "interruptSpokenAudioAndMixWithOthers": .interruptSpokenAudioAndMixWithOthers, + "allowBluetooth": .allowBluetooth, + "allowBluetoothA2DP": .allowBluetoothA2DP, + "allowAirPlay": .allowAirPlay, + "defaultToSpeaker": .defaultToSpeaker, + // @available(iOS 14.5, *) + // "overrideMutedMicrophoneInterruption": .overrideMutedMicrophoneInterruption, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/mode + let modeMap: [String: AVAudioSession.Mode] = [ + "default": .default, + "gameChat": .gameChat, + "measurement": .measurement, + "moviePlayback": .moviePlayback, + "spokenAudio": .spokenAudio, + "videoChat": .videoChat, + "videoRecording": .videoRecording, + "voiceChat": .voiceChat, + "voicePrompt": .voicePrompt, + ] + + private func categoryOptions(fromFlutter options: [String]) -> AVAudioSession.CategoryOptions { + var result: AVAudioSession.CategoryOptions = [] + for option in categoryOptionsMap { + if options.contains(option.key) { + result.insert(option.value) + } } + return result } - return result - } #endif - public func handleStartAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { + private func audioProcessors(for trackId: String) -> AudioProcessors? { + if let existing = audioProcessors[trackId] { + return existing + } + let webrtc = FlutterWebRTCPlugin.sharedSingleton() - let trackId = args["trackId"] as? String - let visualizerId = args["visualizerId"] as? String + var audioTrack: AudioTrack? + if let track = webrtc?.localTracks![trackId] as? LocalAudioTrack { + audioTrack = LKLocalAudioTrack(name: trackId, track: track) + } else if let track = webrtc?.remoteTrack(forId: trackId) as? RTCAudioTrack { + audioTrack = LKRemoteAudioTrack(name: trackId, track: track) + } + + guard let audioTrack else { + return nil + } + + let processor = AudioProcessors(track: audioTrack) + audioProcessors[trackId] = processor + return processor + } + + public func handleStartAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { + // Required params + let trackId = args[trackIdKey] as? String + let visualizerId = args[visualizerIdKey] as? String + + guard let trackId else { + result(FlutterError(code: trackIdKey, message: "\(trackIdKey) is required", details: nil)) + return + } + + guard let visualizerId else { + result(FlutterError(code: visualizerIdKey, message: "\(visualizerIdKey) is required", details: nil)) + return + } + + // Optional params let barCount = args["barCount"] as? Int ?? 7 let isCentered = args["isCentered"] as? Bool ?? true let smoothTransition = args["smoothTransition"] as? Bool ?? true - if visualizerId == nil { - result(FlutterError(code: "visualizerId", message: "visualizerId is required", details: nil)) + guard let processors = audioProcessors(for: trackId) else { + result(FlutterError(code: trackIdKey, message: "No such track", details: nil)) return } - if let unwrappedTrackId = trackId { - let unwrappedVisualizerId = visualizerId! - - let localTrack = webrtc?.localTracks![unwrappedTrackId] - if let audioTrack = localTrack as? LocalAudioTrack { - let lkLocalTrack = LKLocalAudioTrack(name: unwrappedTrackId, track: audioTrack); - let processor = Visualizer(track: lkLocalTrack, - binaryMessenger: self.binaryMessenger!, - bandCount: barCount, - isCentered: isCentered, - smoothTransition: smoothTransition, - visualizerId: unwrappedVisualizerId) - - tracks[unwrappedTrackId] = lkLocalTrack - processors[unwrappedVisualizerId] = processor - - } - - let track = webrtc?.remoteTrack(forId: unwrappedTrackId) - if let audioTrack = track as? RTCAudioTrack { - let lkRemoteTrack = LKRemoteAudioTrack(name: unwrappedTrackId, track: audioTrack); - let processor = Visualizer(track: lkRemoteTrack, - binaryMessenger: self.binaryMessenger!, - bandCount: barCount, - isCentered: isCentered, - smoothTransition: smoothTransition, - visualizerId: unwrappedVisualizerId) - tracks[unwrappedTrackId] = lkRemoteTrack - processors[unwrappedVisualizerId] = processor - } + // Already exists + if processors.visualizers[visualizerId] != nil { + result(true) + return } + let visualizer = Visualizer(track: processors.track, + binaryMessenger: binaryMessenger!, + bandCount: barCount, + isCentered: isCentered, + smoothTransition: smoothTransition, + visualizerId: visualizerId) + // Retain + processors.visualizers[visualizerId] = visualizer result(true) } public func handleStopAudioVisualizer(args: [String: Any?], result: @escaping FlutterResult) { - let trackId = args["trackId"] as? String - let visualizerId = args["visualizerId"] as? String - if let unwrappedTrackId = trackId { - for key in tracks.keys { - if key == unwrappedTrackId { - tracks.removeValue(forKey: key) - } - } + // let trackId = args["trackId"] as? String + let visualizerId = args[visualizerIdKey] as? String + + guard let visualizerId else { + result(FlutterError(code: visualizerIdKey, message: "\(visualizerIdKey) is required", details: nil)) + return } - if let unwrappedVisualizerId = visualizerId { - processors.removeValue(forKey: unwrappedVisualizerId) + + for processors in audioProcessors.values { + processors.visualizers.removeValue(forKey: visualizerId) } + result(true) } - public func handleConfigureNativeAudio(args: [String: Any?], result: @escaping FlutterResult) { + public func parseAudioFormat(args: [String: Any?]) -> AVAudioFormat? { + guard let commonFormatString = args[commonFormatKey] as? String, + let sampleRate = args[sampleRateKey] as? Double, + let channels = args[channelsKey] as? AVAudioChannelCount else { + return nil + } - #if os(macOS) - result(FlutterMethodNotImplemented) - #else + let commonFormat: AVAudioCommonFormat + switch commonFormatString { + case "float32": + commonFormat = .pcmFormatFloat32 + case "int16": + commonFormat = .pcmFormatInt16 + case "int32": + commonFormat = .pcmFormatInt32 + default: + return nil + } - let configuration = RTCAudioSessionConfiguration.webRTC() + return AVAudioFormat(commonFormat: commonFormat, sampleRate: sampleRate, channels: channels, interleaved: false) + } + + public func handleStartAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { + // Required params + let trackId = args[trackIdKey] as? String + let rendererId = args[rendererIdKey] as? String + + let formatMap = args[formatKey] as? [String: Any?] + + guard let formatMap else { + result(FlutterError(code: formatKey, message: "\(formatKey) is required", details: nil)) + return + } - // Category - if let string = args["appleAudioCategory"] as? String, - let category = categoryMap[string] { - configuration.category = category.rawValue - print("[LiveKit] Configuring category: ", configuration.category) + guard let format = parseAudioFormat(args: formatMap) else { + result(FlutterError(code: formatKey, message: "Failed to parse format", details: nil)) + return } - // CategoryOptions - if let strings = args["appleAudioCategoryOptions"] as? [String] { - configuration.categoryOptions = categoryOptions(fromFlutter: strings) - print("[LiveKit] Configuring categoryOptions: ", strings) + guard let trackId else { + result(FlutterError(code: trackIdKey, message: "\(trackIdKey) is required", details: nil)) + return } - // Mode - if let string = args["appleAudioMode"] as? String, - let mode = modeMap[string] { - configuration.mode = mode.rawValue - print("[LiveKit] Configuring mode: ", configuration.mode) + guard let rendererId else { + result(FlutterError(code: rendererIdKey, message: "\(rendererIdKey) is required", details: nil)) + return } - // get `RTCAudioSession` and lock - let rtcSession = RTCAudioSession.sharedInstance() - rtcSession.lockForConfiguration() + guard let processors = audioProcessors(for: trackId) else { + result(FlutterError(code: trackIdKey, message: "No such track", details: nil)) + return + } - var isLocked: Bool = true - let unlock = { - guard isLocked else { - print("[LiveKit] not locked, ignoring unlock") - return - } - rtcSession.unlockForConfiguration() - isLocked = false + // Already exists + if processors.renderers[rendererId] != nil { + result(true) + return } - // always `unlock()` when exiting scope, calling multiple times has no side-effect - defer { - unlock() + let renderer = AudioRenderer(track: processors.track, + binaryMessenger: binaryMessenger!, + rendererId: rendererId, + format: format) + // Retain + processors.renderers[rendererId] = renderer + + result(true) + } + + public func handleStopAudioRenderer(args: [String: Any?], result: @escaping FlutterResult) { + let rendererId = args[rendererIdKey] as? String + + guard let rendererId else { + result(FlutterError(code: rendererIdKey, message: "\(rendererIdKey) is required", details: nil)) + return } - do { - try rtcSession.setConfiguration(configuration, active: true) - // unlock here before configuring `AVAudioSession` - // unlock() - print("[LiveKit] RTCAudioSession Configure success") - - // also configure longFormAudio - // let avSession = AVAudioSession.sharedInstance() - // try avSession.setCategory(AVAudioSession.Category(rawValue: configuration.category), - // mode: AVAudioSession.Mode(rawValue: configuration.mode), - // policy: .default, - // options: configuration.categoryOptions) - // print("[LiveKit] AVAudioSession Configure success") - - // preferSpeakerOutput - if let preferSpeakerOutput = args["preferSpeakerOutput"] as? Bool { - try rtcSession.overrideOutputAudioPort(preferSpeakerOutput ? .speaker : .none) + for processors in audioProcessors.values { + if let renderer = processors.renderers[rendererId] { + renderer.detach() + processors.renderers.removeValue(forKey: rendererId) } - result(true) - } catch let error { - print("[LiveKit] Configure audio error: ", error) - result(FlutterError(code: "configure", message: error.localizedDescription, details: nil)) } + + result(true) + } + + public func handleConfigureNativeAudio(args: [String: Any?], result: @escaping FlutterResult) { + #if os(macOS) + result(FlutterMethodNotImplemented) + #else + + let configuration = RTCAudioSessionConfiguration.webRTC() + + // Category + if let string = args["appleAudioCategory"] as? String, + let category = categoryMap[string] + { + configuration.category = category.rawValue + print("[LiveKit] Configuring category: ", configuration.category) + } + + // CategoryOptions + if let strings = args["appleAudioCategoryOptions"] as? [String] { + configuration.categoryOptions = categoryOptions(fromFlutter: strings) + print("[LiveKit] Configuring categoryOptions: ", strings) + } + + // Mode + if let string = args["appleAudioMode"] as? String, + let mode = modeMap[string] + { + configuration.mode = mode.rawValue + print("[LiveKit] Configuring mode: ", configuration.mode) + } + + // get `RTCAudioSession` and lock + let rtcSession = RTCAudioSession.sharedInstance() + rtcSession.lockForConfiguration() + + var isLocked = true + let unlock = { + guard isLocked else { + print("[LiveKit] not locked, ignoring unlock") + return + } + rtcSession.unlockForConfiguration() + isLocked = false + } + + // always `unlock()` when exiting scope, calling multiple times has no side-effect + defer { + unlock() + } + + do { + try rtcSession.setConfiguration(configuration, active: true) + // unlock here before configuring `AVAudioSession` + // unlock() + print("[LiveKit] RTCAudioSession Configure success") + + // also configure longFormAudio + // let avSession = AVAudioSession.sharedInstance() + // try avSession.setCategory(AVAudioSession.Category(rawValue: configuration.category), + // mode: AVAudioSession.Mode(rawValue: configuration.mode), + // policy: .default, + // options: configuration.categoryOptions) + // print("[LiveKit] AVAudioSession Configure success") + + // preferSpeakerOutput + if let preferSpeakerOutput = args["preferSpeakerOutput"] as? Bool { + try rtcSession.overrideOutputAudioPort(preferSpeakerOutput ? .speaker : .none) + } + result(true) + } catch { + print("[LiveKit] Configure audio error: ", error) + result(FlutterError(code: "configure", message: error.localizedDescription, details: nil)) + } #endif } @@ -258,7 +380,7 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { if osVersion.patchVersion != 0 { versions.append(osVersion.patchVersion) } - return versions.map({ String($0) }).joined(separator: ".") + return versions.map { String($0) }.joined(separator: ".") } public func handle(_ call: FlutterMethodCall, result: @escaping FlutterResult) { @@ -275,15 +397,19 @@ public class LiveKitPlugin: NSObject, FlutterPlugin { handleStartAudioVisualizer(args: args, result: result) case "stopVisualizer": handleStopAudioVisualizer(args: args, result: result) + case "startAudioRenderer": + handleStartAudioRenderer(args: args, result: result) + case "stopAudioRenderer": + handleStopAudioRenderer(args: args, result: result) case "osVersionString": result(LiveKitPlugin.osVersionString()) #if os(iOS) - case "broadcastRequestActivation": - BroadcastManager.shared.requestActivation() - result(true) - case "broadcastRequestStop": - BroadcastManager.shared.requestStop() - result(true) + case "broadcastRequestActivation": + BroadcastManager.shared.requestActivation() + result(true) + case "broadcastRequestStop": + BroadcastManager.shared.requestStop() + result(true) #endif default: print("[LiveKit] method not found: ", call.method) diff --git a/test/support/completer_manager_test.dart b/test/support/completer_manager_test.dart new file mode 100644 index 000000000..0f45455d7 --- /dev/null +++ b/test/support/completer_manager_test.dart @@ -0,0 +1,408 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; + +import 'package:livekit_client/src/support/completer_manager.dart'; + +void main() { + group('CompleterManager', () { + late CompleterManager manager; + + setUp(() { + manager = CompleterManager(); + }); + + tearDown(() { + // Only dispose if not already completed or disposed + try { + if (manager.isActive) { + manager.complete('teardown'); + } + manager.dispose(); + } catch (_) { + // Already disposed, ignore + } + }); + + group('Basic Functionality', () { + test('should provide a future when accessed', () async { + final future = manager.future; + expect(future, isA>()); + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future, completion('test')); + }); + + test('should complete successfully with value', () async { + final future = manager.future; + final result = manager.complete('success'); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + expect(manager.isActive, isFalse); + await expectLater(future, completion('success')); + }); + + test('should complete successfully without value', () async { + final manager = CompleterManager(); + final future = manager.future; + final result = manager.complete(); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + await expectLater(future, completion(isNull)); + manager.dispose(); + }); + + test('should complete with error', () async { + final future = manager.future; + final testError = Exception('test error'); + final result = manager.completeError(testError); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + expect(manager.isActive, isFalse); + await expectLater(future, throwsA(testError)); + }); + + test('should complete with error and stack trace', () async { + final future = manager.future; + final testError = Exception('test error'); + final stackTrace = StackTrace.current; + final result = manager.completeError(testError, stackTrace); + + expect(result, isTrue); + expect(manager.isCompleted, isTrue); + + try { + await future; + fail('Should have thrown an error'); + } catch (error, trace) { + expect(error, equals(testError)); + expect(trace, equals(stackTrace)); + } + }); + + test('should return false when completing already completed manager', () { + manager.complete('first'); + final result1 = manager.complete('second'); + final result2 = manager.completeError(Exception('error')); + + expect(result1, isFalse); + expect(result2, isFalse); + }); + }); + + group('State Properties', () { + test('initial state should be inactive and not completed', () { + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isFalse); + }); + + test('should be active after accessing future', () async { + final future = manager.future; + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future, completion('test')); + }); + + test('should be completed after completion', () async { + final future = manager.future; + manager.complete('done'); + + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isTrue); + await expectLater(future, completion('done')); + }); + + test('should be completed after error completion', () async { + final future = manager.future; + final testError = Exception('error'); + manager.completeError(testError); + + expect(manager.isActive, isFalse); + expect(manager.isCompleted, isTrue); + await expectLater(future, throwsA(testError)); + }); + }); + + group('Reusability', () { + test('should create new future after previous completion', () async { + // First use + final future1 = manager.future; + manager.complete('first'); + await expectLater(future1, completion('first')); + + // Second use - should get new future + final future2 = manager.future; + expect(future2, isNot(same(future1))); + expect(manager.isActive, isTrue); + expect(manager.isCompleted, isFalse); + + manager.complete('second'); + await expectLater(future2, completion('second')); + }); + + test('should reset and be reusable', () async { + // First use + final future1 = manager.future; + manager.complete('first'); + await expectLater(future1, completion('first')); + + // Reset - note that reset creates a new completer, so it's not active until future is accessed + manager.reset(); + expect(manager.isCompleted, isFalse); + // After reset, manager is ready but not active until future is accessed + + // Second use after reset + final future2 = manager.future; + expect(manager.isActive, isTrue); + manager.complete('second'); + await expectLater(future2, completion('second')); + }); + + test('should reset even when active', () async { + final future1 = manager.future; + expect(manager.isActive, isTrue); + + manager.reset(); + expect(manager.isCompleted, isFalse); + // After reset, manager is ready but not active until future is accessed + + final future2 = manager.future; + expect(manager.isActive, isTrue); + expect(future2, isNot(same(future1))); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future2, completion('test')); + }); + }); + + group('Timeout Functionality', () { + test('should timeout with default message', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 10)); + + await expectLater( + future, + throwsA(isA()), + ); + expect(manager.isCompleted, isTrue); + }); + + test('should timeout with custom message', () async { + final future = manager.future; + const customMessage = 'Custom timeout message'; + manager.setTimer(Duration(milliseconds: 10), timeoutReason: customMessage); + + try { + await future; + fail('Should have thrown TimeoutException'); + } catch (error) { + expect(error, isA()); + expect((error as TimeoutException).message, contains(customMessage)); + } + }); + + test('should cancel timeout on manual completion', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 100)); + + // Complete before timeout + manager.complete('completed'); + await expectLater(future, completion('completed')); + + // Wait longer than timeout to ensure it was cancelled + await Future.delayed(Duration(milliseconds: 150)); + // If we get here without additional errors, timeout was cancelled + }); + + test('should cancel timeout on error completion', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 100)); + + // Complete with error before timeout + final testError = Exception('test error'); + manager.completeError(testError); + await expectLater(future, throwsA(testError)); + + // Wait longer than timeout to ensure it was cancelled + await Future.delayed(Duration(milliseconds: 150)); + // If we get here without additional errors, timeout was cancelled + }); + + test('should replace previous timeout when setting new one', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 200)); + manager.setTimer(Duration(milliseconds: 10)); // This should replace the previous one + + await expectLater( + future, + throwsA(isA()), + ); + }); + + test('should not set timeout on completed manager', () async { + final future = manager.future; + manager.complete('done'); + await expectLater(future, completion('done')); + + // This should not throw or affect anything + manager.setTimer(Duration(milliseconds: 10)); + + // Verify still completed + expect(manager.isCompleted, isTrue); + }); + + test('should not set timeout when no active completer', () { + // Should not throw + manager.setTimer(Duration(milliseconds: 10)); + expect(manager.isActive, isFalse); + }); + }); + + group('Disposal', () { + test('should complete with error when disposed while active', () async { + final future = manager.future; + expect(manager.isActive, isTrue); + + manager.dispose(); + + await expectLater( + future, + throwsA(isA()), + ); + expect(manager.isCompleted, isTrue); + }); + + test('should not affect already completed manager', () async { + final future = manager.future; + manager.complete('done'); + await expectLater(future, completion('done')); + + // Dispose should not throw or change state + manager.dispose(); + expect(manager.isCompleted, isTrue); + }); + + test('should cancel timeout on dispose', () async { + final future = manager.future; + manager.setTimer(Duration(milliseconds: 10)); + + manager.dispose(); + + // Should complete with StateError, not TimeoutException + await expectLater( + future, + throwsA(isA()), + ); + }); + + test('should not allow operations after dispose', () { + manager.dispose(); + + final result1 = manager.complete('test'); + final result2 = manager.completeError(Exception('error')); + + expect(result1, isFalse); + expect(result2, isFalse); + expect(manager.isCompleted, isTrue); + }); + }); + + group('Edge Cases', () { + test('should handle multiple future accesses for same completer', () async { + final future1 = manager.future; + final future2 = manager.future; + + expect(identical(future1, future2), isTrue); + expect(manager.isActive, isTrue); + + // Complete it to avoid tearDown issues + manager.complete('test'); + await expectLater(future1, completion('test')); + }); + + test('should handle rapid complete/reset cycles', () async { + for (int i = 0; i < 5; i++) { + final future = manager.future; + manager.complete('value_$i'); + await expectLater(future, completion('value_$i')); + if (i < 4) { + // Don't reset on the last iteration + manager.reset(); + } + } + }); + + test('should work with different generic types', () async { + final intManager = CompleterManager(); + final intFuture = intManager.future; + intManager.complete(42); + await expectLater(intFuture, completion(42)); + intManager.dispose(); + + final boolManager = CompleterManager(); + final boolFuture = boolManager.future; + boolManager.complete(true); + await expectLater(boolFuture, completion(isTrue)); + boolManager.dispose(); + }); + + test('should handle Future values in complete', () async { + final future = manager.future; + final futureValue = Future.value('async_result'); + manager.complete(futureValue); + + await expectLater(future, completion('async_result')); + }); + }); + + group('Thread Safety', () { + test('should handle concurrent operations safely', () async { + final futures = []; + + // Start multiple concurrent operations + for (int i = 0; i < 10; i++) { + futures.add(Future(() async { + final future = manager.future; + if (i == 0) { + // Only the first one should succeed in completing + await Future.delayed(Duration(milliseconds: 1)); + manager.complete('winner'); + } + return future; + })); + } + + final results = await Future.wait(futures, eagerError: false); + + // All should complete with the same value + for (final result in results) { + expect(result, equals('winner')); + } + }); + }); + }); +}