@@ -20,8 +20,9 @@ import android.content.Context
20
20
import android.net.Uri
21
21
import androidx.lifecycle.Observer
22
22
import androidx.test.internal.runner.junit4.statement.UiThreadStatement
23
+ import kotlinx.coroutines.CoroutineScope
23
24
import kotlinx.coroutines.Dispatchers
24
- import kotlinx.coroutines.GlobalScope
25
+ import kotlinx.coroutines.SupervisorJob
25
26
import kotlinx.coroutines.delay
26
27
import kotlinx.coroutines.launch
27
28
import kotlinx.coroutines.runBlocking
@@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit
55
56
class CommonTestHelper (context : Context ) {
56
57
57
58
val matrix: TestMatrix
59
+ val coroutineScope = CoroutineScope (SupervisorJob () + Dispatchers .Main )
58
60
59
61
fun getTestInterceptor (session : Session ): MockOkHttpInterceptor ? = TestModule .interceptorForSession(session.sessionId) as ? MockOkHttpInterceptor
60
62
@@ -93,30 +95,45 @@ class CommonTestHelper(context: Context) {
93
95
*
94
96
* @param session the session to sync
95
97
*/
96
- @Suppress(" EXPERIMENTAL_API_USAGE" )
97
- fun syncSession (session : Session , timeout : Long = TestConstants .timeOutMillis) {
98
+ fun syncSession (session : Session , timeout : Long = TestConstants .timeOutMillis * 10) {
98
99
val lock = CountDownLatch (1 )
99
-
100
- val job = GlobalScope .launch(Dispatchers .Main ) {
101
- session.open()
100
+ coroutineScope.launch {
101
+ session.startSync(true )
102
+ val syncLiveData = session.getSyncStateLive()
103
+ val syncObserver = object : Observer <SyncState > {
104
+ override fun onChanged (t : SyncState ? ) {
105
+ if (session.hasAlreadySynced()) {
106
+ lock.countDown()
107
+ syncLiveData.removeObserver(this )
108
+ }
109
+ }
110
+ }
111
+ syncLiveData.observeForever(syncObserver)
102
112
}
103
- runBlocking { job.join() }
104
-
105
- session.startSync(true )
113
+ await(lock, timeout)
114
+ }
106
115
107
- val syncLiveData = runBlocking(Dispatchers .Main ) {
108
- session.getSyncStateLive()
109
- }
110
- val syncObserver = object : Observer <SyncState > {
111
- override fun onChanged (t : SyncState ? ) {
112
- if (session.hasAlreadySynced()) {
113
- lock.countDown()
114
- syncLiveData.removeObserver(this )
116
+ /* *
117
+ * This methods clear the cache and waits for initialSync
118
+ *
119
+ * @param session the session to sync
120
+ */
121
+ fun clearCacheAndSync (session : Session , timeout : Long = TestConstants .timeOutMillis) {
122
+ val lock = CountDownLatch (1 )
123
+ coroutineScope.launch {
124
+ session.clearCache()
125
+ val syncLiveData = session.getSyncStateLive()
126
+ val syncObserver = object : Observer <SyncState > {
127
+ override fun onChanged (t : SyncState ? ) {
128
+ if (session.hasAlreadySynced()) {
129
+ lock.countDown()
130
+ syncLiveData.removeObserver(this )
131
+ }
115
132
}
116
133
}
134
+ syncLiveData.observeForever(syncObserver)
135
+ session.startSync(true )
117
136
}
118
- GlobalScope .launch(Dispatchers .Main ) { syncLiveData.observeForever(syncObserver) }
119
-
120
137
await(lock, timeout)
121
138
}
122
139
@@ -130,41 +147,40 @@ class CommonTestHelper(context: Context) {
130
147
fun sendTextMessage (room : Room , message : String , nbOfMessages : Int , timeout : Long = TestConstants .timeOutMillis): List <TimelineEvent > {
131
148
val timeline = room.createTimeline(null , TimelineSettings (10 ))
132
149
val sentEvents = ArrayList <TimelineEvent >(nbOfMessages)
133
- val latch = CountDownLatch ( 1 )
134
- val timelineListener = object : Timeline .Listener {
135
- override fun onTimelineFailure (throwable : Throwable ) {
136
- }
150
+ waitWithLatch(timeout ) { latch ->
151
+ val timelineListener = object : Timeline .Listener {
152
+ override fun onTimelineFailure (throwable : Throwable ) {
153
+ }
137
154
138
- override fun onNewTimelineEvents (eventIds : List <String >) {
139
- // noop
140
- }
155
+ override fun onNewTimelineEvents (eventIds : List <String >) {
156
+ // noop
157
+ }
141
158
142
- override fun onTimelineUpdated (snapshot : List <TimelineEvent >) {
143
- val newMessages = snapshot
144
- .filter { it.root.sendState == SendState .SYNCED }
145
- .filter { it.root.getClearType() == EventType .MESSAGE }
146
- .filter { it.root.getClearContent().toModel<MessageContent >()?.body?.startsWith(message) == true }
147
-
148
- if (newMessages.size == nbOfMessages) {
149
- sentEvents.addAll(newMessages)
150
- // Remove listener now, if not at the next update sendEvents could change
151
- timeline.removeListener(this )
152
- latch.countDown()
159
+ override fun onTimelineUpdated (snapshot : List <TimelineEvent >) {
160
+ val newMessages = snapshot
161
+ .filter { it.root.sendState == SendState .SYNCED }
162
+ .filter { it.root.getClearType() == EventType .MESSAGE }
163
+ .filter { it.root.getClearContent().toModel<MessageContent >()?.body?.startsWith(message) == true }
164
+
165
+ if (newMessages.size == nbOfMessages) {
166
+ sentEvents.addAll(newMessages)
167
+ // Remove listener now, if not at the next update sendEvents could change
168
+ timeline.removeListener(this )
169
+ latch.countDown()
170
+ }
153
171
}
154
172
}
173
+ timeline.start()
174
+ timeline.addListener(timelineListener)
175
+ for (i in 0 until nbOfMessages) {
176
+ room.sendTextMessage(message + " #" + (i + 1 ))
177
+ // Sleep a bit otherwise database will be flowed and sync won't be live (we might end up with gap then...)
178
+ delay(50 )
179
+ }
155
180
}
156
- timeline.start()
157
- timeline.addListener(timelineListener)
158
- for (i in 0 until nbOfMessages) {
159
- room.sendTextMessage(message + " #" + (i + 1 ))
160
- }
161
- // Wait 3 second more per message
162
- await(latch, timeout = timeout + 3_000L * nbOfMessages)
163
181
timeline.dispose()
164
-
165
182
// Check that all events has been created
166
183
assertEquals(" Message number do not match $sentEvents " , nbOfMessages.toLong(), sentEvents.size.toLong())
167
-
168
184
return sentEvents
169
185
}
170
186
@@ -237,10 +253,10 @@ class CommonTestHelper(context: Context) {
237
253
238
254
assertTrue(registrationResult is RegistrationResult .Success )
239
255
val session = (registrationResult as RegistrationResult .Success ).session
256
+ session.open()
240
257
if (sessionTestParams.withInitialSync) {
241
258
syncSession(session, 60_000 )
242
259
}
243
-
244
260
return session
245
261
}
246
262
@@ -265,7 +281,7 @@ class CommonTestHelper(context: Context) {
265
281
.getLoginWizard()
266
282
.login(userName, password, " myDevice" )
267
283
}
268
-
284
+ session.open()
269
285
if (sessionTestParams.withInitialSync) {
270
286
syncSession(session)
271
287
}
@@ -331,21 +347,21 @@ class CommonTestHelper(context: Context) {
331
347
}
332
348
333
349
@Suppress(" EXPERIMENTAL_API_USAGE" )
334
- fun retryPeriodicallyWithLatch (latch : CountDownLatch , condition : (() -> Boolean )) {
335
- GlobalScope .launch {
336
- while (true ) {
337
- delay(1000 )
338
- if (condition()) {
339
- latch.countDown()
340
- return @launch
341
- }
350
+ suspend fun retryPeriodicallyWithLatch (latch : CountDownLatch , condition : (() -> Boolean )) {
351
+ while (true ) {
352
+ delay(1000 )
353
+ if (condition()) {
354
+ latch.countDown()
355
+ return
342
356
}
343
357
}
344
358
}
345
359
346
- fun waitWithLatch (timeout : Long? = TestConstants .timeOutMillis, block : (CountDownLatch ) -> Unit ) {
360
+ fun waitWithLatch (timeout : Long? = TestConstants .timeOutMillis, block : suspend (CountDownLatch ) -> Unit ) {
347
361
val latch = CountDownLatch (1 )
348
- block(latch)
362
+ coroutineScope.launch {
363
+ block(latch)
364
+ }
349
365
await(latch, timeout)
350
366
}
351
367
0 commit comments