Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a few issues with the OpenGroupPoller #1241

Merged
merged 2 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.util.concurrent.Executors

object OpenGroupManager {
private val executorService = Executors.newScheduledThreadPool(4)
private var pollers = mutableMapOf<String, OpenGroupPoller>() // One for each server
private val pollers = mutableMapOf<String, OpenGroupPoller>() // One for each server
private var isPolling = false
private val pollUpdaterLock = Any()

Expand All @@ -41,11 +41,11 @@ object OpenGroupManager {
isPolling = true
val storage = MessagingModuleConfiguration.shared.storage
val servers = storage.getAllOpenGroups().values.map { it.server }.toSet()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless .toSet()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the conversion to Set<T> is in order to remove duplicates from the collection - getAllOpenGroups() returns every community you are a member of, so if you are in multiple rooms on the same server (eg. Session, Oxen, Session Updates all on the getsession.org server) the result would have 3 entries for getsession.org

The poll request actually makes a single /batch API call to the server which includes a call for each room on that server (so without the toSet() the forEach loop would end up redundantly initialising and triggering a poll for each copy of the server)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh... cool.

servers.forEach { server ->
pollers[server]?.stop() // Shouldn't be necessary
val poller = OpenGroupPoller(server, executorService)
poller.startIfNeeded()
pollers[server] = poller
synchronized(pollUpdaterLock) {
mpretty-cyro marked this conversation as resolved.
Show resolved Hide resolved
servers.forEach { server ->
pollers[server]?.stop() // Shouldn't be necessary
pollers[server] = OpenGroupPoller(server, executorService).apply { startIfNeeded() }
}
}
}

Expand All @@ -60,7 +60,7 @@ object OpenGroupManager {
@WorkerThread
fun add(server: String, room: String, publicKey: String, context: Context): OpenGroupApi.RoomInfo? {
val openGroupID = "$server.$room"
var threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
val threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
val storage = MessagingModuleConfiguration.shared.storage
val threadDB = DatabaseComponent.get(context).lokiThreadDatabase()
// Check it it's added already
Expand All @@ -76,13 +76,16 @@ object OpenGroupManager {
// Get capabilities & room info
val (capabilities, info) = OpenGroupApi.getCapabilitiesAndRoomInfo(room, server).get()
storage.setServerCapabilities(server, capabilities.capabilities)
storage.setUserCount(room, server, info.activeUsers)
// Create the group locally if not available already
if (threadID < 0) {
threadID = GroupManager.createOpenGroup(openGroupID, context, null, info.name).threadId
GroupManager.createOpenGroup(openGroupID, context, null, info.name)
}
val openGroup = OpenGroup(server = server, room = room, publicKey = publicKey, name = info.name, imageId = info.imageId, canWrite = info.write, infoUpdates = info.infoUpdates)
threadDB.setOpenGroupChat(openGroup, threadID)
OpenGroupPoller.handleRoomPollInfo(
server = server,
roomToken = room,
pollInfo = info.toPollInfo(),
createGroupIfMissingWithPublicKey = publicKey
)
return info
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException())
return
}
// get image
storage.setOpenGroupPublicKey(openGroup.server, openGroup.serverPublicKey)
val info = storage.addOpenGroup(openGroup.joinUrl())
val imageId = info?.imageId
if (imageId != null && storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId) == null) {
JobQueue.shared.add(GroupAvatarDownloadJob(openGroup.server, openGroup.room, imageId))
}
storage.addOpenGroup(openGroup.joinUrl())
Log.d(KEY, "onOpenGroupAdded(${openGroup.server})")
storage.onOpenGroupAdded(openGroup.server)
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,24 @@ object OpenGroupApi {
val defaultWrite: Boolean = false,
val upload: Boolean = false,
val defaultUpload: Boolean = false,
)
) {
fun toPollInfo() = RoomPollInfo(
token = token,
activeUsers = activeUsers,
admin = admin,
globalAdmin = globalAdmin,
moderator = moderator,
globalModerator = globalModerator,
read = read,
defaultRead = defaultRead,
defaultAccessible = defaultAccessible,
write = write,
defaultWrite = defaultWrite,
upload = upload,
defaultUpload = defaultUpload,
details = this
)
}

@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
data class PinnedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
Expand All @@ -39,15 +40,101 @@ class OpenGroupPoller(private val server: String, private val executorService: S
var isCaughtUp = false
var secondToLastJob: MessageReceiveJob? = null
private var future: ScheduledFuture<*>? = null
@Volatile private var runId: UUID = UUID.randomUUID()

companion object {
private const val pollInterval: Long = 4000L
const val maxInactivityPeriod = 14 * 24 * 60 * 60 * 1000

public fun handleRoomPollInfo(
server: String,
roomToken: String,
pollInfo: OpenGroupApi.RoomPollInfo,
createGroupIfMissingWithPublicKey: String? = null
) {
val storage = MessagingModuleConfiguration.shared.storage
val groupId = "$server.$roomToken"
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray())
val existingOpenGroup = storage.getOpenGroup(roomToken, server)

// If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey'
// value then don't process the poll info
val publicKey = existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey
val name = pollInfo.details?.name ?: existingOpenGroup?.name
val infoUpdates = pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates

if (publicKey == null) return

val openGroup = OpenGroup(
server = server,
room = pollInfo.token,
name = name ?: "",
publicKey = publicKey,
imageId = (pollInfo.details?.imageId ?: existingOpenGroup?.imageId),
canWrite = pollInfo.write,
infoUpdates = infoUpdates ?: 0
)
// - Open Group changes
storage.updateOpenGroup(openGroup)

// - User Count
storage.setUserCount(roomToken, server, pollInfo.activeUsers)

// - Moderators
pollInfo.details?.moderators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.MODERATOR)
})
}
pollInfo.details?.hiddenModerators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR)
})
}
// - Admins
pollInfo.details?.admins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.ADMIN)
})
}
pollInfo.details?.hiddenAdmins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN)
})
}

// Update the group avatar
if (
(
pollInfo.details != null &&
pollInfo.details.imageId != null && (
pollInfo.details.imageId != existingOpenGroup?.imageId ||
!storage.hasDownloadedProfilePicture(dbGroupId)
) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null
) || (
pollInfo.details == null &&
existingOpenGroup?.imageId != null &&
!storage.hasDownloadedProfilePicture(dbGroupId) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null
)
) {
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, openGroup.imageId))
}
else if (
pollInfo.details != null &&
pollInfo.details.imageId == null &&
existingOpenGroup?.imageId != null
) {
storage.removeProfilePicture(dbGroupId)
}
}
}

fun startIfNeeded() {
if (hasStarted) { return }
hasStarted = true
runId = UUID.randomUUID()
future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS)
}

Expand All @@ -57,6 +144,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S
}

fun poll(isPostCapabilitiesRetry: Boolean = false): Promise<Unit, Exception> {
val currentRunId = runId
val storage = MessagingModuleConfiguration.shared.storage
val rooms = storage.getAllOpenGroups().values.filter { it.server == server }.map { it.room }

Expand Down Expand Up @@ -86,22 +174,30 @@ class OpenGroupPoller(private val server: String, private val executorService: S
isCaughtUp = true
}
}
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)

// Only poll again if it's the same poller run
if (currentRunId == runId) {
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
}
}.fail {
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, it)
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, currentRunId, it)
}.map { }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless map{}, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map was there to convert the return value from Promise<List<OpenGroupApi.BatchResponse<*>>, Exception> to Promise<Unit, Exception>

}

private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, exception: Exception) {
private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, currentRunId: UUID, exception: Exception) {
if (exception is OnionRequestAPI.HTTPRequestFailedBlindingRequiredException) {
if (!isPostCapabilitiesRetry) {
OpenGroupApi.getCapabilities(server).map {
handleCapabilities(server, it)
}
executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS)

// Only poll again if it's the same poller run
if (currentRunId == runId) {
future = executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS)
}
}
} else {
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
} else if (currentRunId == runId) {
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS)
}
}

Expand All @@ -110,82 +206,6 @@ class OpenGroupPoller(private val server: String, private val executorService: S
storage.setServerCapabilities(server, capabilities.capabilities)
}

private fun handleRoomPollInfo(
server: String,
roomToken: String,
pollInfo: OpenGroupApi.RoomPollInfo
) {
val storage = MessagingModuleConfiguration.shared.storage
val groupId = "$server.$roomToken"
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray())

val existingOpenGroup = storage.getOpenGroup(roomToken, server)
val publicKey = existingOpenGroup?.publicKey ?: return
val openGroup = OpenGroup(
server = server,
room = pollInfo.token,
name = if (pollInfo.details != null) { pollInfo.details.name } else { existingOpenGroup.name },
publicKey = publicKey,
imageId = if (pollInfo.details != null) { pollInfo.details.imageId } else { existingOpenGroup.imageId },
canWrite = pollInfo.write,
infoUpdates = if (pollInfo.details != null) { pollInfo.details.infoUpdates } else { existingOpenGroup.infoUpdates }
)
// - Open Group changes
storage.updateOpenGroup(openGroup)

// - User Count
storage.setUserCount(roomToken, server, pollInfo.activeUsers)

// - Moderators
pollInfo.details?.moderators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.MODERATOR)
})
}
pollInfo.details?.hiddenModerators?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR)
})
}
// - Admins
pollInfo.details?.admins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.ADMIN)
})
}
pollInfo.details?.hiddenAdmins?.let { moderatorList ->
storage.setGroupMemberRoles(moderatorList.map {
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN)
})
}

// Update the group avatar
if (
(
pollInfo.details != null &&
pollInfo.details.imageId != null && (
pollInfo.details.imageId != existingOpenGroup.imageId ||
!storage.hasDownloadedProfilePicture(dbGroupId)
) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null
) || (
pollInfo.details == null &&
existingOpenGroup.imageId != null &&
!storage.hasDownloadedProfilePicture(dbGroupId) &&
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null
)
) {
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, existingOpenGroup.imageId))
}
else if (
pollInfo.details != null &&
pollInfo.details.imageId == null &&
existingOpenGroup.imageId != null
) {
storage.removeProfilePicture(dbGroupId)
}
}

private fun handleMessages(
server: String,
roomToken: String,
Expand Down