@@ -15,46 +15,50 @@ public final class BackgroundPoller {
15
15
completionHandler: @escaping ( UIBackgroundFetchResult ) -> Void ,
16
16
using dependencies: Dependencies = Dependencies ( )
17
17
) {
18
+ let ( groupIds, servers) : ( Set < String > , Set < String > ) = Storage . shared. read { db in
19
+ (
20
+ try ClosedGroup
21
+ . select ( . threadId)
22
+ . joining (
23
+ required: ClosedGroup . members
24
+ . filter ( GroupMember . Columns. profileId == getUserHexEncodedPublicKey ( db) )
25
+ )
26
+ . asRequest ( of: String . self)
27
+ . fetchSet ( db) ,
28
+ /// The default room promise creates an OpenGroup with an empty `roomToken` value, we
29
+ /// don't want to start a poller for this as the user hasn't actually joined a room
30
+ ///
31
+ /// We also want to exclude any rooms which have failed to poll too many times in a row from
32
+ /// the background poll as they are likely to fail again
33
+ try OpenGroup
34
+ . select ( . server)
35
+ . filter (
36
+ OpenGroup . Columns. roomToken != " " &&
37
+ OpenGroup . Columns. isActive &&
38
+ OpenGroup . Columns. pollFailureCount < OpenGroupAPI . Poller. maxRoomFailureCountForBackgroundPoll
39
+ )
40
+ . distinct ( )
41
+ . asRequest ( of: String . self)
42
+ . fetchSet ( db)
43
+ )
44
+ }
45
+ . defaulting ( to: ( [ ] , [ ] ) )
46
+
47
+ Log . info ( " [BackgroundPoller] Fetching 1 User, \( groupIds. count) \( " group " , number: groupIds. count) , \( servers. count) \( " communit " , number: servers. count, singular: " y " , plural: " ies " ) . " )
18
48
Publishers
19
49
. MergeMany (
20
50
[ pollForMessages ( using: dependencies) ]
21
- . appending ( contentsOf: pollForClosedGroupMessages ( using: dependencies) )
22
- . appending (
23
- contentsOf: Storage . shared
24
- . read { db in
25
- /// The default room promise creates an OpenGroup with an empty `roomToken` value, we
26
- /// don't want to start a poller for this as the user hasn't actually joined a room
27
- ///
28
- /// We also want to exclude any rooms which have failed to poll too many times in a row from
29
- /// the background poll as they are likely to fail again
30
- try OpenGroup
31
- . select ( . server)
32
- . filter (
33
- OpenGroup . Columns. roomToken != " " &&
34
- OpenGroup . Columns. isActive &&
35
- OpenGroup . Columns. pollFailureCount < OpenGroupAPI . Poller. maxRoomFailureCountForBackgroundPoll
36
- )
37
- . distinct ( )
38
- . asRequest ( of: String . self)
39
- . fetchSet ( db)
40
- }
41
- . defaulting ( to: [ ] )
42
- . map { server -> AnyPublisher < Void , Error > in
43
- let poller : OpenGroupAPI . Poller = OpenGroupAPI . Poller ( for: server)
44
- poller. stop ( )
45
-
46
- return poller. poll (
47
- calledFromBackgroundPoller: true ,
48
- isBackgroundPollerValid: { BackgroundPoller . isValid } ,
49
- isPostCapabilitiesRetry: false ,
50
- using: dependencies
51
- )
52
- }
53
- )
51
+ . appending ( contentsOf: pollForClosedGroupMessages ( groupIds: groupIds, using: dependencies) )
52
+ . appending ( contentsOf: pollForCommunityMessages ( servers: servers, using: dependencies) )
54
53
)
55
54
. subscribe ( on: DispatchQueue . global ( qos: . background) , using: dependencies)
56
55
. receive ( on: DispatchQueue . main, using: dependencies)
57
56
. collect ( )
57
+ . handleEvents (
58
+ receiveOutput: { _ in
59
+ Log . info ( " [BackgroundPoller] Finished polling. " )
60
+ }
61
+ )
58
62
. sinkUntilComplete (
59
63
receiveCompletion: { result in
60
64
// If we have already invalidated the timer then do nothing (we essentially timed out)
@@ -63,7 +67,7 @@ public final class BackgroundPoller {
63
67
switch result {
64
68
case . finished: completionHandler ( . newData)
65
69
case . failure( let error) :
66
- SNLog ( " Background poll failed due to error: \( error) " )
70
+ Log . error ( " [BackgroundPoller] Failed due to error: \( error) . " )
67
71
completionHandler ( . failed)
68
72
}
69
73
}
@@ -83,39 +87,55 @@ public final class BackgroundPoller {
83
87
drainBehaviour: . alwaysRandom,
84
88
using: dependencies
85
89
)
90
+ . handleEvents (
91
+ receiveOutput: { _, _, validMessageCount, _ in
92
+ Log . info ( " [BackgroundPoller] Received \( validMessageCount) valid \( " message " , number: validMessageCount) . " )
93
+ }
94
+ )
86
95
. map { _ in ( ) }
87
96
. eraseToAnyPublisher ( )
88
97
}
89
98
90
99
private static func pollForClosedGroupMessages(
100
+ groupIds: Set < String > ,
91
101
using dependencies: Dependencies
92
102
) -> [ AnyPublisher < Void , Error > ] {
93
103
// Fetch all closed groups (excluding any don't contain the current user as a
94
104
// GroupMemeber as the user is no longer a member of those)
95
- return Storage . shared
96
- . read { db in
97
- try ClosedGroup
98
- . select ( . threadId)
99
- . joining (
100
- required: ClosedGroup . members
101
- . filter ( GroupMember . Columns. profileId == getUserHexEncodedPublicKey ( db) )
102
- )
103
- . asRequest ( of: String . self)
104
- . fetchAll ( db)
105
- }
106
- . defaulting ( to: [ ] )
107
- . map { groupPublicKey in
108
- return ClosedGroupPoller ( )
109
- . poll (
110
- namespaces: ClosedGroupPoller . namespaces,
111
- for: groupPublicKey,
112
- calledFromBackgroundPoller: true ,
113
- isBackgroundPollValid: { BackgroundPoller . isValid } ,
114
- drainBehaviour: . alwaysRandom,
115
- using: dependencies
116
- )
117
- . map { _ in ( ) }
118
- . eraseToAnyPublisher ( )
119
- }
105
+ return groupIds. map { groupPublicKey in
106
+ return ClosedGroupPoller ( )
107
+ . poll (
108
+ namespaces: ClosedGroupPoller . namespaces,
109
+ for: groupPublicKey,
110
+ calledFromBackgroundPoller: true ,
111
+ isBackgroundPollValid: { BackgroundPoller . isValid } ,
112
+ drainBehaviour: . alwaysRandom,
113
+ using: dependencies
114
+ )
115
+ . handleEvents (
116
+ receiveOutput: { _, _, validMessageCount, _ in
117
+ Log . info ( " [BackgroundPoller] Received \( validMessageCount) valid \( " message " , number: validMessageCount) for group: \( groupPublicKey) . " )
118
+ }
119
+ )
120
+ . map { _ in ( ) }
121
+ . eraseToAnyPublisher ( )
122
+ }
123
+ }
124
+
125
+ private static func pollForCommunityMessages(
126
+ servers: Set < String > ,
127
+ using dependencies: Dependencies
128
+ ) -> [ AnyPublisher < Void , Error > ] {
129
+ return servers. map { server -> AnyPublisher < Void , Error > in
130
+ let poller : OpenGroupAPI . Poller = OpenGroupAPI . Poller ( for: server)
131
+ poller. stop ( )
132
+
133
+ return poller. poll (
134
+ calledFromBackgroundPoller: true ,
135
+ isBackgroundPollerValid: { BackgroundPoller . isValid } ,
136
+ isPostCapabilitiesRetry: false ,
137
+ using: dependencies
138
+ )
139
+ }
120
140
}
121
141
}
0 commit comments