-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed a few issues with the OpenGroupPoller #1241
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import org.session.libsignal.protos.SignalServiceProtos | |
import org.session.libsignal.utilities.Base64 | ||
import org.session.libsignal.utilities.Log | ||
import org.session.libsignal.utilities.successBackground | ||
import java.util.UUID | ||
import java.util.concurrent.ScheduledExecutorService | ||
import java.util.concurrent.ScheduledFuture | ||
import java.util.concurrent.TimeUnit | ||
|
@@ -39,15 +40,101 @@ class OpenGroupPoller(private val server: String, private val executorService: S | |
var isCaughtUp = false | ||
var secondToLastJob: MessageReceiveJob? = null | ||
private var future: ScheduledFuture<*>? = null | ||
@Volatile private var runId: UUID = UUID.randomUUID() | ||
|
||
companion object { | ||
private const val pollInterval: Long = 4000L | ||
const val maxInactivityPeriod = 14 * 24 * 60 * 60 * 1000 | ||
|
||
public fun handleRoomPollInfo( | ||
server: String, | ||
roomToken: String, | ||
pollInfo: OpenGroupApi.RoomPollInfo, | ||
createGroupIfMissingWithPublicKey: String? = null | ||
) { | ||
val storage = MessagingModuleConfiguration.shared.storage | ||
val groupId = "$server.$roomToken" | ||
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray()) | ||
val existingOpenGroup = storage.getOpenGroup(roomToken, server) | ||
|
||
// If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey' | ||
// value then don't process the poll info | ||
val publicKey = existingOpenGroup?.publicKey ?: createGroupIfMissingWithPublicKey | ||
val name = pollInfo.details?.name ?: existingOpenGroup?.name | ||
val infoUpdates = pollInfo.details?.infoUpdates ?: existingOpenGroup?.infoUpdates | ||
|
||
if (publicKey == null) return | ||
|
||
val openGroup = OpenGroup( | ||
server = server, | ||
room = pollInfo.token, | ||
name = name ?: "", | ||
publicKey = publicKey, | ||
imageId = (pollInfo.details?.imageId ?: existingOpenGroup?.imageId), | ||
canWrite = pollInfo.write, | ||
infoUpdates = infoUpdates ?: 0 | ||
) | ||
// - Open Group changes | ||
storage.updateOpenGroup(openGroup) | ||
|
||
// - User Count | ||
storage.setUserCount(roomToken, server, pollInfo.activeUsers) | ||
|
||
// - Moderators | ||
pollInfo.details?.moderators?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.MODERATOR) | ||
}) | ||
} | ||
pollInfo.details?.hiddenModerators?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR) | ||
}) | ||
} | ||
// - Admins | ||
pollInfo.details?.admins?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.ADMIN) | ||
}) | ||
} | ||
pollInfo.details?.hiddenAdmins?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN) | ||
}) | ||
} | ||
|
||
// Update the group avatar | ||
if ( | ||
( | ||
pollInfo.details != null && | ||
pollInfo.details.imageId != null && ( | ||
pollInfo.details.imageId != existingOpenGroup?.imageId || | ||
!storage.hasDownloadedProfilePicture(dbGroupId) | ||
) && | ||
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null | ||
) || ( | ||
pollInfo.details == null && | ||
existingOpenGroup?.imageId != null && | ||
!storage.hasDownloadedProfilePicture(dbGroupId) && | ||
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null | ||
) | ||
) { | ||
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, openGroup.imageId)) | ||
} | ||
else if ( | ||
pollInfo.details != null && | ||
pollInfo.details.imageId == null && | ||
existingOpenGroup?.imageId != null | ||
) { | ||
storage.removeProfilePicture(dbGroupId) | ||
} | ||
} | ||
} | ||
|
||
fun startIfNeeded() { | ||
if (hasStarted) { return } | ||
hasStarted = true | ||
runId = UUID.randomUUID() | ||
future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS) | ||
} | ||
|
||
|
@@ -57,6 +144,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S | |
} | ||
|
||
fun poll(isPostCapabilitiesRetry: Boolean = false): Promise<Unit, Exception> { | ||
val currentRunId = runId | ||
val storage = MessagingModuleConfiguration.shared.storage | ||
val rooms = storage.getAllOpenGroups().values.filter { it.server == server }.map { it.room } | ||
|
||
|
@@ -86,22 +174,30 @@ class OpenGroupPoller(private val server: String, private val executorService: S | |
isCaughtUp = true | ||
} | ||
} | ||
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) | ||
|
||
// Only poll again if it's the same poller run | ||
if (currentRunId == runId) { | ||
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) | ||
} | ||
}.fail { | ||
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, it) | ||
updateCapabilitiesIfNeeded(isPostCapabilitiesRetry, currentRunId, it) | ||
}.map { } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. useless There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The map was there to convert the return value from |
||
} | ||
|
||
private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, exception: Exception) { | ||
private fun updateCapabilitiesIfNeeded(isPostCapabilitiesRetry: Boolean, currentRunId: UUID, exception: Exception) { | ||
if (exception is OnionRequestAPI.HTTPRequestFailedBlindingRequiredException) { | ||
if (!isPostCapabilitiesRetry) { | ||
OpenGroupApi.getCapabilities(server).map { | ||
handleCapabilities(server, it) | ||
} | ||
executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS) | ||
|
||
// Only poll again if it's the same poller run | ||
if (currentRunId == runId) { | ||
future = executorService?.schedule({ poll(isPostCapabilitiesRetry = true) }, pollInterval, TimeUnit.MILLISECONDS) | ||
} | ||
} | ||
} else { | ||
executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) | ||
} else if (currentRunId == runId) { | ||
future = executorService?.schedule(this@OpenGroupPoller::poll, pollInterval, TimeUnit.MILLISECONDS) | ||
} | ||
} | ||
|
||
|
@@ -110,82 +206,6 @@ class OpenGroupPoller(private val server: String, private val executorService: S | |
storage.setServerCapabilities(server, capabilities.capabilities) | ||
} | ||
|
||
private fun handleRoomPollInfo( | ||
server: String, | ||
roomToken: String, | ||
pollInfo: OpenGroupApi.RoomPollInfo | ||
) { | ||
val storage = MessagingModuleConfiguration.shared.storage | ||
val groupId = "$server.$roomToken" | ||
val dbGroupId = GroupUtil.getEncodedOpenGroupID(groupId.toByteArray()) | ||
|
||
val existingOpenGroup = storage.getOpenGroup(roomToken, server) | ||
val publicKey = existingOpenGroup?.publicKey ?: return | ||
val openGroup = OpenGroup( | ||
server = server, | ||
room = pollInfo.token, | ||
name = if (pollInfo.details != null) { pollInfo.details.name } else { existingOpenGroup.name }, | ||
publicKey = publicKey, | ||
imageId = if (pollInfo.details != null) { pollInfo.details.imageId } else { existingOpenGroup.imageId }, | ||
canWrite = pollInfo.write, | ||
infoUpdates = if (pollInfo.details != null) { pollInfo.details.infoUpdates } else { existingOpenGroup.infoUpdates } | ||
) | ||
// - Open Group changes | ||
storage.updateOpenGroup(openGroup) | ||
|
||
// - User Count | ||
storage.setUserCount(roomToken, server, pollInfo.activeUsers) | ||
|
||
// - Moderators | ||
pollInfo.details?.moderators?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.MODERATOR) | ||
}) | ||
} | ||
pollInfo.details?.hiddenModerators?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.HIDDEN_MODERATOR) | ||
}) | ||
} | ||
// - Admins | ||
pollInfo.details?.admins?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.ADMIN) | ||
}) | ||
} | ||
pollInfo.details?.hiddenAdmins?.let { moderatorList -> | ||
storage.setGroupMemberRoles(moderatorList.map { | ||
GroupMember(groupId, it, GroupMemberRole.HIDDEN_ADMIN) | ||
}) | ||
} | ||
|
||
// Update the group avatar | ||
if ( | ||
( | ||
pollInfo.details != null && | ||
pollInfo.details.imageId != null && ( | ||
pollInfo.details.imageId != existingOpenGroup.imageId || | ||
!storage.hasDownloadedProfilePicture(dbGroupId) | ||
) && | ||
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, pollInfo.details.imageId) == null | ||
) || ( | ||
pollInfo.details == null && | ||
existingOpenGroup.imageId != null && | ||
!storage.hasDownloadedProfilePicture(dbGroupId) && | ||
storage.getGroupAvatarDownloadJob(openGroup.server, openGroup.room, existingOpenGroup.imageId) == null | ||
) | ||
) { | ||
JobQueue.shared.add(GroupAvatarDownloadJob(server, roomToken, existingOpenGroup.imageId)) | ||
} | ||
else if ( | ||
pollInfo.details != null && | ||
pollInfo.details.imageId == null && | ||
existingOpenGroup.imageId != null | ||
) { | ||
storage.removeProfilePicture(dbGroupId) | ||
} | ||
} | ||
|
||
private fun handleMessages( | ||
server: String, | ||
roomToken: String, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless
.toSet()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the conversion to
Set<T>
is in order to remove duplicates from the collection -getAllOpenGroups()
returns every community you are a member of, so if you are in multiple rooms on the same server (eg.Session
,Oxen
,Session Updates
all on thegetsession.org
server) the result would have 3 entries forgetsession.org
The
poll
request actually makes a single/batch
API call to the server which includes a call for each room on that server (so without thetoSet()
theforEach
loop would end up redundantly initialising and triggering apoll
for each copy of the server)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohhh... cool.