Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7419.wip
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[Voice Broadcast] Live listening support
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ fun Event.getRelationContent(): RelationDefaultContent? {
when (getClearType()) {
EventType.STICKER -> getClearContent().toModel<MessageStickerContent>()?.relatesTo
in EventType.BEACON_LOCATION_DATA -> getClearContent().toModel<MessageBeaconLocationDataContent>()?.relatesTo
else -> null
else -> getClearContent()?.get("m.relates_to")?.toContent().toModel()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import io.realm.Sort
import org.matrix.android.sdk.api.session.events.model.getRelationContent
import org.matrix.android.sdk.api.session.events.model.isImageMessage
import org.matrix.android.sdk.api.session.events.model.isVideoMessage
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.util.Optional
import org.matrix.android.sdk.internal.database.RealmSessionProvider
Expand Down Expand Up @@ -74,7 +76,13 @@ internal class TimelineEventDataSource @Inject constructor(
.distinct(TimelineEventEntityFields.EVENT_ID)
.findAll()
.mapNotNull {
timelineEventMapper.map(it).takeIf { it.root.getRelationContent()?.takeIf { it.type == eventType && it.eventId == eventId } != null }
timelineEventMapper.map(it)
.takeIf {
val isEventRelatedTo = it.root.getRelationContent()?.takeIf { it.type == eventType && it.eventId == eventId } != null
val isContentRelatedTo = it.root.getClearContent()?.toModel<MessageContent>()
?.relatesTo?.takeIf { it.type == eventType && it.eventId == eventId } != null
isEventRelatedTo || isContentRelatedTo
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class VoiceBroadcastHelper @Inject constructor(

suspend fun stopVoiceBroadcast(roomId: String) = stopVoiceBroadcastUseCase.execute(roomId)

fun playOrResumePlayback(roomId: String, eventId: String) = voiceBroadcastPlayer.play(roomId, eventId)
fun playOrResumePlayback(roomId: String, eventId: String) = voiceBroadcastPlayer.playOrResume(roomId, eventId)

fun pausePlayback() = voiceBroadcastPlayer.pause()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,105 +18,194 @@ package im.vector.app.features.voicebroadcast

import android.media.AudioAttributes
import android.media.MediaPlayer
import im.vector.app.core.di.ActiveSessionHolder
import im.vector.app.features.home.room.detail.timeline.helper.AudioMessagePlaybackTracker
import im.vector.app.features.home.room.detail.timeline.helper.AudioMessagePlaybackTracker.Listener.State
import im.vector.app.features.voice.VoiceFailure
import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState
import im.vector.app.features.voicebroadcast.model.asVoiceBroadcastEvent
import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.events.model.RelationType
import org.matrix.android.sdk.api.session.events.model.getRelationContent
import org.matrix.android.sdk.api.session.getRoom
import org.matrix.android.sdk.api.session.room.Room
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent
import org.matrix.android.sdk.api.session.room.model.message.asMessageAudioEvent
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton

@Singleton
class VoiceBroadcastPlayer @Inject constructor(
private val session: Session,
private val sessionHolder: ActiveSessionHolder,
private val playbackTracker: AudioMessagePlaybackTracker,
private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase,
) {
private val session
get() = sessionHolder.getActiveSession()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use getSafeActiveSession instead? The getActiveSession may throw an exception. If we keep, maybe we should try/catch in the call sites of this variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be handled by #7423


private val mediaPlayerScope = CoroutineScope(Dispatchers.IO)
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: you will see but you may have a hard time when trying to add unit tests on this class due to the usage of this coroutineScope. Maybe in the future we may have to change the implementation of some methods.

private var voiceBroadcastStateJob: Job? = null
private var currentTimeline: Timeline? = null
set(value) {
field?.removeAllListeners()
field?.dispose()
field = value
}

private val mediaPlayerListener = MediaPlayerListener()
private var timelineListener: TimelineListener? = null

private var currentMediaPlayer: MediaPlayer? = null
private var currentPlayingIndex: Int = -1
private var nextMediaPlayer: MediaPlayer? = null
set(value) {
field = value
currentMediaPlayer?.setNextMediaPlayer(value)
}
private var currentSequence: Int? = null

private var playlist = emptyList<MessageAudioEvent>()
private val currentVoiceBroadcastEventId
private val currentVoiceBroadcastId
get() = playlist.firstOrNull()?.root?.getRelationContent()?.eventId

private val mediaPlayerListener = MediaPlayerListener()

fun play(roomId: String, eventId: String) {
val room = session.getRoom(roomId) ?: error("Unknown roomId: $roomId")
private var state: State = State.IDLE
set(value) {
Timber.w("## VoiceBroadcastPlayer state: $field -> $value")
field = value
}
private var currentRoomId: String? = null

fun playOrResume(roomId: String, eventId: String) {
val hasChanged = currentVoiceBroadcastId != eventId
when {
currentVoiceBroadcastEventId != eventId -> {
stop()
updatePlaylist(room, eventId)
startPlayback()
}
playbackTracker.getPlaybackState(eventId) is State.Playing -> pause()
else -> resumePlayback()
hasChanged -> startPlayback(roomId, eventId)
state == State.PAUSED -> resumePlayback()
else -> Unit
}
}

fun pause() {
currentMediaPlayer?.pause()
currentVoiceBroadcastEventId?.let { playbackTracker.pausePlayback(it) }
currentVoiceBroadcastId?.let { playbackTracker.pausePlayback(it) }
state = State.PAUSED
}

fun stop() {
// Stop playback
currentMediaPlayer?.stop()
currentMediaPlayer?.release()
currentMediaPlayer?.setOnInfoListener(null)
currentVoiceBroadcastId?.let { playbackTracker.stopPlayback(it) }

// Release current player
release(currentMediaPlayer)
currentMediaPlayer = null
currentVoiceBroadcastEventId?.let { playbackTracker.stopPlayback(it) }

// Release next player
release(nextMediaPlayer)
nextMediaPlayer = null

// Do not observe anymore voice broadcast state changes
voiceBroadcastStateJob?.cancel()
voiceBroadcastStateJob = null

// In case of live broadcast, stop observing new chunks
currentTimeline = null
timelineListener = null

// Update state
state = State.IDLE

// Clear playlist
playlist = emptyList()
currentPlayingIndex = -1
currentSequence = null
currentRoomId = null
}

private fun updatePlaylist(room: Room, eventId: String) {
val timelineEvents = room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, eventId)
val audioEvents = timelineEvents.mapNotNull { it.root.asMessageAudioEvent() }
playlist = audioEvents.sortedBy { it.getVoiceBroadcastChunk()?.sequence?.toLong() ?: it.root.originServerTs }
private fun startPlayback(roomId: String, eventId: String) {
val room = session.getRoom(roomId) ?: error("Unknown roomId: $roomId")
currentRoomId = roomId

// Stop listening previous voice broadcast if any
if (state != State.IDLE) stop()

state = State.BUFFERING

val voiceBroadcastState = getVoiceBroadcastUseCase.execute(roomId, eventId)?.content?.voiceBroadcastState
if (voiceBroadcastState == VoiceBroadcastState.STOPPED) {
// Get static playlist
updatePlaylist(getExistingChunks(room, eventId))
startPlayback(false)
} else {
playLiveVoiceBroadcast(room, eventId)
}
}

private fun startPlayback() {
val content = playlist.firstOrNull()?.content ?: run { Timber.w("## VoiceBroadcastPlayer: No content to play"); return }
mediaPlayerScope.launch {
private fun startPlayback(isLive: Boolean) {
val event = if (isLive) playlist.lastOrNull() else playlist.firstOrNull()
val content = event?.content ?: run { Timber.w("## VoiceBroadcastPlayer: No content to play"); return }
val sequence = event.getVoiceBroadcastChunk()?.sequence
coroutineScope.launch {
try {
currentMediaPlayer = prepareMediaPlayer(content)
currentMediaPlayer?.start()
currentPlayingIndex = 0
currentVoiceBroadcastEventId?.let { playbackTracker.startPlayback(it) }
prepareNextFile()
currentVoiceBroadcastId?.let { playbackTracker.startPlayback(it) }
currentSequence = sequence
state = State.PLAYING
nextMediaPlayer = prepareNextMediaPlayer()
} catch (failure: Throwable) {
Timber.e(failure, "Unable to start playback")
throw VoiceFailure.UnableToPlay(failure)
}
}
}

private fun playLiveVoiceBroadcast(room: Room, eventId: String) {
room.timelineService().getTimelineEvent(eventId)?.root?.asVoiceBroadcastEvent() ?: error("Cannot retrieve voice broadcast $eventId")
updatePlaylist(getExistingChunks(room, eventId))
startPlayback(true)
observeIncomingEvents(room, eventId)
}

private fun getExistingChunks(room: Room, eventId: String): List<MessageAudioEvent> {
return room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, eventId)
.mapNotNull { it.root.asMessageAudioEvent() }
.filter { it.isVoiceBroadcast() }
}

private fun observeIncomingEvents(room: Room, eventId: String) {
currentTimeline = room.timelineService().createTimeline(null, TimelineSettings(5)).also { timeline ->
timelineListener = TimelineListener(eventId).also { timeline.addListener(it) }
timeline.start()
}
}

private fun resumePlayback() {
currentMediaPlayer?.start()
currentVoiceBroadcastEventId?.let { playbackTracker.startPlayback(it) }
currentVoiceBroadcastId?.let { playbackTracker.startPlayback(it) }
state = State.PLAYING
}

private suspend fun prepareNextFile() {
val nextContent = playlist.getOrNull(currentPlayingIndex + 1)?.content
if (nextContent == null) {
currentMediaPlayer?.setOnCompletionListener(mediaPlayerListener)
} else {
val nextMediaPlayer = prepareMediaPlayer(nextContent)
currentMediaPlayer?.setNextMediaPlayer(nextMediaPlayer)
}
private fun updatePlaylist(playlist: List<MessageAudioEvent>) {
this.playlist = playlist.sortedBy { it.getVoiceBroadcastChunk()?.sequence?.toLong() ?: it.root.originServerTs }
}

private fun getNextAudioContent(): MessageAudioContent? {
val nextSequence = currentSequence?.plus(1)
?: timelineListener?.let { playlist.lastOrNull()?.sequence }
?: 1
return playlist.find { it.getVoiceBroadcastChunk()?.sequence == nextSequence }?.content
}

private suspend fun prepareNextMediaPlayer(): MediaPlayer? {
val nextContent = getNextAudioContent() ?: return null
return prepareMediaPlayer(nextContent)
}

private suspend fun prepareMediaPlayer(messageAudioContent: MessageAudioContent): MediaPlayer {
Expand All @@ -140,28 +229,78 @@ class VoiceBroadcastPlayer @Inject constructor(
setDataSource(fis.fd)
setOnInfoListener(mediaPlayerListener)
setOnErrorListener(mediaPlayerListener)
setOnCompletionListener(mediaPlayerListener)
prepare()
}
}
}

inner class MediaPlayerListener : MediaPlayer.OnInfoListener, MediaPlayer.OnCompletionListener, MediaPlayer.OnErrorListener {
private fun release(mp: MediaPlayer?) {
mp?.apply {
release()
setOnInfoListener(null)
setOnCompletionListener(null)
setOnErrorListener(null)
}
}

private inner class TimelineListener(private val voiceBroadcastId: String) : Timeline.Listener {
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
val currentSequences = playlist.map { it.sequence }
val newChunks = snapshot
.mapNotNull { timelineEvent ->
timelineEvent.root.asMessageAudioEvent()
?.takeIf { it.isVoiceBroadcast() && it.getVoiceBroadcastEventId() == voiceBroadcastId && it.sequence !in currentSequences }
}
if (newChunks.isEmpty()) return
updatePlaylist(playlist + newChunks)

when (state) {
State.PLAYING -> {
if (nextMediaPlayer == null) {
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
}
}
State.PAUSED -> {
if (nextMediaPlayer == null) {
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
}
}
State.BUFFERING -> {
val newMediaContent = getNextAudioContent()
if (newMediaContent != null) startPlayback(true)
}
State.IDLE -> startPlayback(true)
}
}
}

private inner class MediaPlayerListener : MediaPlayer.OnInfoListener, MediaPlayer.OnCompletionListener, MediaPlayer.OnErrorListener {

override fun onInfo(mp: MediaPlayer, what: Int, extra: Int): Boolean {
when (what) {
MediaPlayer.MEDIA_INFO_STARTED_AS_NEXT -> {
release(currentMediaPlayer)
currentMediaPlayer = mp
currentPlayingIndex++
mediaPlayerScope.launch { prepareNextFile() }
currentSequence = currentSequence?.plus(1)
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
}
}
return false
}

override fun onCompletion(mp: MediaPlayer) {
// Verify that a new media has not been set in the mean time
if (!currentMediaPlayer?.isPlaying.orFalse()) {
if (nextMediaPlayer != null) return
val roomId = currentRoomId ?: return
val voiceBroadcastId = currentVoiceBroadcastId ?: return
val voiceBroadcastEventContent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId)?.content ?: return
val isLive = voiceBroadcastEventContent.voiceBroadcastState != null && voiceBroadcastEventContent.voiceBroadcastState != VoiceBroadcastState.STOPPED

if (!isLive && voiceBroadcastEventContent.lastChunkSequence == currentSequence) {
// We'll not receive new chunks anymore so we can stop the live listening
stop()
} else {
state = State.BUFFERING
}
}

Expand All @@ -170,4 +309,11 @@ class VoiceBroadcastPlayer @Inject constructor(
return true
}
}

enum class State {
PLAYING,
PAUSED,
BUFFERING,
IDLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.File
interface VoiceBroadcastRecorder : VoiceRecorder {

var listener: Listener?
var currentSequence: Int

fun startRecord(roomId: String, chunkLength: Int)

Expand Down
Loading