From 89d019051d9e0d7d134e0adac051732d6630f98b Mon Sep 17 00:00:00 2001 From: fanchao Date: Wed, 19 Jun 2024 10:55:11 +1000 Subject: [PATCH 1/5] Remove wrapping of config message --- libsession-util/libsession-util | 2 +- .../messaging/jobs/ConfigurationSyncJob.kt | 2 +- .../sending_receiving/MessageSender.kt | 9 ++++ .../sending_receiving/pollers/Poller.kt | 44 +++++++------------ .../org/session/libsession/snode/SnodeAPI.kt | 4 +- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/libsession-util/libsession-util b/libsession-util/libsession-util index 626b6628a2a..4b6f595fdbd 160000 --- a/libsession-util/libsession-util +++ b/libsession-util/libsession-util @@ -1 +1 @@ -Subproject commit 626b6628a2af8fff798042416b3b469b8bfc6ecf +Subproject commit 4b6f595fdbd3b5f6fba380253e560d8ee296b734 diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt index ec8de441631..04acb670352 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/ConfigurationSyncJob.kt @@ -67,7 +67,7 @@ data class ConfigurationSyncJob(val destination: Destination): Job { SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config }.map { (message, config) -> // return a list of batch request objects - val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true) + val snodeMessage = MessageSender.buildConfigMessageToSnode(destination.destinationPublicKey(), message) val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo( destination.destinationPublicKey(), config.configNamespace(), diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 0968db27e2d..8081fe1d927 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -81,6 +81,15 @@ object MessageSender { } } + fun buildConfigMessageToSnode(destinationPubKey: String, message: SharedConfigurationMessage): SnodeMessage { + return SnodeMessage( + destinationPubKey, + Base64.encodeBytes(message.data), + ttl = message.ttl, + SnodeAPI.nowWithOffset + ) + } + // One-on-One Chats & Closed Groups @Throws(Exception::class) fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 39ed79de1e6..a4ed822f465 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -25,6 +25,7 @@ import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeModule import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode @@ -126,37 +127,26 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) { if (forConfigObject == null) return - val messages = SnodeAPI.parseRawMessagesResponse( - rawMessages, - snode, - userPublicKey, - namespace, - updateLatestHash = true, - updateStoredHashes = true, - ) + val messages = rawMessages["messages"] as? List<*> + val processed = if (messages != null && messages.isNotEmpty()) { + SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) + SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { messageBody -> + val rawMessageAsJSON = messageBody as? Map<*, *> ?: return@mapNotNull null + val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null + val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null + val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset + val body = Base64.decode(b64EncodedBody) + Triple(body, hashValue, timestamp) + } + } else emptyList() - if (messages.isEmpty()) { - // no new messages to process - return - } + if (processed.isEmpty()) return var latestMessageTimestamp: Long? = null - messages.forEach { (envelope, hash) -> + processed.forEach { (body, hash, timestamp) -> try { - val (message, _) = MessageReceiver.parse(data = envelope.toByteArray(), - // assume no groups in personal poller messages - openGroupServerID = null, currentClosedGroups = emptySet() - ) - // sanity checks - if (message !is SharedConfigurationMessage) { - Log.w("Loki", "shared config message handled in configs wasn't SharedConfigurationMessage but was ${message.javaClass.simpleName}") - return@forEach - } - val merged = forConfigObject.merge(hash!! to message.data).firstOrNull { it == hash } - if (merged != null) { - // We successfully merged the hash, we can now update the timestamp - latestMessageTimestamp = if ((message.sentTimestamp ?: 0L) > (latestMessageTimestamp ?: 0L)) { message.sentTimestamp } else { latestMessageTimestamp } - } + forConfigObject.merge(hash to body) + latestMessageTimestamp = if (timestamp > (latestMessageTimestamp ?: 0L)) { timestamp } else { latestMessageTimestamp } } catch (e: Exception) { Log.e("Loki", e) } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 0f996bacacb..d2cfa2de35c 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -829,7 +829,7 @@ object SnodeAPI { } } - private fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { + fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *> val hashValue = lastMessageAsJSON?.get("hash") as? String if (hashValue != null) { @@ -839,7 +839,7 @@ object SnodeAPI { } } - private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> { + fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> { val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf() val receivedMessageHashValues = originalMessageHashValues.toMutableSet() val result = rawMessages.filter { rawMessage -> From a617cba58a13b7568d1a253244a601a30a34a754 Mon Sep 17 00:00:00 2001 From: fanchao Date: Thu, 20 Jun 2024 10:55:06 +1000 Subject: [PATCH 2/5] Addresses feedback --- .../libsession/messaging/sending_receiving/pollers/Poller.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index a4ed822f465..d05290a5fd2 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -128,7 +128,7 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti if (forConfigObject == null) return val messages = rawMessages["messages"] as? List<*> - val processed = if (messages != null && messages.isNotEmpty()) { + val processed = if (!messages.isNullOrEmpty()) { SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { messageBody -> val rawMessageAsJSON = messageBody as? Map<*, *> ?: return@mapNotNull null From fac01a8f511865aa3c2317392b4c91228432b44e Mon Sep 17 00:00:00 2001 From: fanchao Date: Mon, 24 Jun 2024 09:33:35 +1000 Subject: [PATCH 3/5] Merged in ThreadUtils fix --- .../libsignal/utilities/ThreadUtils.kt | 29 +++---------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt b/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt index e920d85b473..6485babe808 100644 --- a/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt +++ b/libsignal/src/main/java/org/session/libsignal/utilities/ThreadUtils.kt @@ -1,11 +1,13 @@ package org.session.libsignal.utilities import android.os.Process +import kotlinx.coroutines.Dispatchers import java.util.concurrent.ExecutorService import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.coroutines.EmptyCoroutineContext object ThreadUtils { @@ -13,39 +15,16 @@ object ThreadUtils { const val PRIORITY_IMPORTANT_BACKGROUND_THREAD = Process.THREAD_PRIORITY_DEFAULT + Process.THREAD_PRIORITY_LESS_FAVORABLE - // Paraphrased from: https://www.baeldung.com/kotlin/create-thread-pool - // "A cached thread pool such as one created via: - // `val executorPool: ExecutorService = Executors.newCachedThreadPool()` - // will utilize resources according to the requirements of submitted tasks. It will try to reuse - // existing threads for submitted tasks but will create as many threads as it needs if new tasks - // keep pouring in (with a memory usage of at least 1MB per created thread). These threads will - // live for up to 60 seconds of idle time before terminating by default. As such, it presents a - // very sharp tool that doesn't include any backpressure mechanism - and a sudden peak in load - // can bring the system down with an OutOfMemory error. We can achieve a similar effect but with - // better control by creating a ThreadPoolExecutor manually." - - private val corePoolSize = Runtime.getRuntime().availableProcessors() // Default thread pool size is our CPU core count - private val maxPoolSize = corePoolSize * 4 // Allow a maximum pool size of up to 4 threads per core - private val keepAliveTimeSecs = 100L // How long to keep idle threads in the pool before they are terminated - private val workQueue = SynchronousQueue() - val executorPool: ExecutorService = ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS, workQueue) - // Note: To see how many threads are running in our app at any given time we can use: // val threadCount = getAllStackTraces().size @JvmStatic fun queue(target: Runnable) { - executorPool.execute { - try { - target.run() - } catch (e: Exception) { - Log.e(TAG, e) - } - } + queue(target::run) } fun queue(target: () -> Unit) { - executorPool.execute { + Dispatchers.IO.dispatch(EmptyCoroutineContext) { try { target() } catch (e: Exception) { From 9e217f98025f28f17fb8fce84a227751e421bca3 Mon Sep 17 00:00:00 2001 From: fanchao Date: Mon, 24 Jun 2024 10:38:26 +1000 Subject: [PATCH 4/5] JDK installation --- .drone.jsonnet | 2 +- app/src/main/java/org/thoughtcrime/securesms/AppContext.kt | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index dc81115ce92..9fe54fd8301 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -78,7 +78,7 @@ local ci_dep_mirror(want_mirror) = (if want_mirror then ' -DLOCAL_MIRROR=https:/ pull: 'always', environment: { SSH_KEY: { from_secret: 'SSH_KEY' }, ANDROID_HOME: '/usr/lib/android-sdk' }, commands: [ - 'apt-get install -y ninja-build', + 'apt-get install -y ninja-build openjdk-17-jdk-headless', './gradlew assemblePlayDebug', './scripts/drone-static-upload.sh' ], diff --git a/app/src/main/java/org/thoughtcrime/securesms/AppContext.kt b/app/src/main/java/org/thoughtcrime/securesms/AppContext.kt index 2588618b729..34ab960021b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/AppContext.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/AppContext.kt @@ -1,9 +1,10 @@ package org.thoughtcrime.securesms +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.asExecutor import nl.komponents.kovenant.Kovenant import nl.komponents.kovenant.jvm.asDispatcher import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.ThreadUtils import java.util.concurrent.Executors object AppContext { @@ -11,7 +12,7 @@ object AppContext { fun configureKovenant() { Kovenant.context { callbackContext.dispatcher = Executors.newSingleThreadExecutor().asDispatcher() - workerContext.dispatcher = ThreadUtils.executorPool.asDispatcher() + workerContext.dispatcher = Dispatchers.IO.asExecutor().asDispatcher() multipleCompletion = { v1, v2 -> Log.d("Loki", "Promise resolved more than once (first with $v1, then with $v2); ignoring $v2.") } From f560513f9f247ce141b453f23425b5133bd56333 Mon Sep 17 00:00:00 2001 From: fanchao Date: Thu, 27 Jun 2024 09:52:42 +1000 Subject: [PATCH 5/5] Revert JDK change --- .drone.jsonnet | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index 9fe54fd8301..dc81115ce92 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -78,7 +78,7 @@ local ci_dep_mirror(want_mirror) = (if want_mirror then ' -DLOCAL_MIRROR=https:/ pull: 'always', environment: { SSH_KEY: { from_secret: 'SSH_KEY' }, ANDROID_HOME: '/usr/lib/android-sdk' }, commands: [ - 'apt-get install -y ninja-build openjdk-17-jdk-headless', + 'apt-get install -y ninja-build', './gradlew assemblePlayDebug', './scripts/drone-static-upload.sh' ],