@@ -10,6 +10,7 @@ import (
10
10
"github.com/matrix-org/dendrite/internal"
11
11
"github.com/matrix-org/dendrite/internal/pushgateway"
12
12
"github.com/matrix-org/dendrite/pushserver/api"
13
+ "github.com/matrix-org/dendrite/pushserver/internal/pushrules"
13
14
"github.com/matrix-org/dendrite/pushserver/storage"
14
15
rsapi "github.com/matrix-org/dendrite/roomserver/api"
15
16
"github.com/matrix-org/dendrite/setup/config"
@@ -96,7 +97,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, ore *rsapi
96
97
"event_type" : event .Type (),
97
98
}).Infof ("Received event from room server: %#v" , ore )
98
99
99
- members , err := s .localRoomMembers (ctx , event .RoomID ())
100
+ members , roomSize , err := s .localRoomMembers (ctx , event .RoomID ())
100
101
if err != nil {
101
102
return err
102
103
}
@@ -112,10 +113,10 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, ore *rsapi
112
113
// TODO: does it have to be set? It's not required, and
113
114
// removing it means we can send all notifications to
114
115
// e.g. Element's Push gateway in one go.
115
- for _ , localpart := range members {
116
- if err := s .notifyLocal (ctx , event , localpart ); err != nil {
116
+ for _ , mem := range members {
117
+ if err := s .notifyLocal (ctx , event , mem , roomSize ); err != nil {
117
118
log .WithFields (log.Fields {
118
- "localpart" : localpart ,
119
+ "localpart" : mem . Localpart ,
119
120
}).WithError (err ).Errorf ("Unable to evaluate push rules" )
120
121
continue
121
122
}
@@ -124,8 +125,15 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, ore *rsapi
124
125
return nil
125
126
}
126
127
127
- // localRoomMembers fetches the current local members of a room.
128
- func (s * OutputRoomEventConsumer ) localRoomMembers (ctx context.Context , roomID string ) ([]string , error ) {
128
+ type localMembership struct {
129
+ gomatrixserverlib.MemberContent
130
+ UserID string
131
+ Localpart string
132
+ }
133
+
134
+ // localRoomMembers fetches the current local members of a room, and
135
+ // the total number of members.
136
+ func (s * OutputRoomEventConsumer ) localRoomMembers (ctx context.Context , roomID string ) ([]* localMembership , int , error ) {
129
137
req := & rsapi.QueryMembershipsForRoomRequest {
130
138
RoomID : roomID ,
131
139
JoinedOnly : true ,
@@ -135,24 +143,27 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s
135
143
// XXX: This could potentially race if the state for the event is not known yet
136
144
// e.g. the event came over federation but we do not have the full state persisted.
137
145
if err := s .rsAPI .QueryMembershipsForRoom (ctx , req , & res ); err != nil {
138
- return nil , err
146
+ return nil , 0 , err
139
147
}
140
148
141
- var members []string
149
+ var members []* localMembership
150
+ var ntotal int
142
151
for _ , event := range res .JoinEvents {
143
152
if event .StateKey == nil {
144
153
continue
145
154
}
146
155
147
- var member gomatrixserverlib. MemberContent
148
- if err := json .Unmarshal (event .Content , & member ); err != nil {
156
+ var member localMembership
157
+ if err := json .Unmarshal (event .Content , & member . MemberContent ); err != nil {
149
158
log .WithError (err ).Errorf ("Parsing MemberContent" )
150
159
continue
151
160
}
152
161
if member .Membership != gomatrixserverlib .Join {
153
162
continue
154
163
}
155
164
165
+ ntotal ++
166
+
156
167
localpart , domain , err := gomatrixserverlib .SplitID ('@' , * event .StateKey )
157
168
if err != nil {
158
169
log .WithFields (log.Fields {
@@ -164,37 +175,39 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s
164
175
continue
165
176
}
166
177
167
- members = append (members , localpart )
178
+ member .UserID = * event .StateKey
179
+ member .Localpart = localpart
180
+ members = append (members , & member )
168
181
}
169
182
170
- return members , nil
183
+ return members , ntotal , nil
171
184
}
172
185
173
186
// notifyLocal finds the right push actions for a local user, given an event.
174
- func (s * OutputRoomEventConsumer ) notifyLocal (ctx context.Context , event * gomatrixserverlib.HeaderedEvent , localpart string ) error {
175
- ok , tweaks , err := s .evaluatePushRules (ctx , event , localpart )
187
+ func (s * OutputRoomEventConsumer ) notifyLocal (ctx context.Context , event * gomatrixserverlib.HeaderedEvent , mem * localMembership , roomSize int ) error {
188
+ ok , tweaks , err := s .evaluatePushRules (ctx , event , mem , roomSize )
176
189
if err != nil {
177
190
return err
178
191
} else if ! ok {
179
192
return nil
180
193
}
181
194
182
- devicesByURL , err := s .localPushDevices (ctx , localpart , tweaks )
195
+ devicesByURL , err := s .localPushDevices (ctx , mem . Localpart , tweaks )
183
196
if err != nil {
184
197
return err
185
198
}
186
199
187
200
log .WithFields (log.Fields {
188
201
"room_id" : event .RoomID (),
189
- "localpart" : localpart ,
202
+ "localpart" : mem . Localpart ,
190
203
"num_urls" : len (devicesByURL ),
191
204
}).Infof ("Notifying push gateways" )
192
205
193
206
var rejected []* pushgateway.Device
194
207
for url , devices := range devicesByURL {
195
208
log .WithFields (log.Fields {
196
209
"room_id" : event .RoomID (),
197
- "localpart" : localpart ,
210
+ "localpart" : mem . Localpart ,
198
211
"url" : url ,
199
212
}).Infof ("Notifying push gateway" )
200
213
@@ -203,29 +216,104 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr
203
216
continue
204
217
}
205
218
206
- rej , err := s .notifyHTTP (ctx , event , url , devices , localpart )
219
+ rej , err := s .notifyHTTP (ctx , event , url , devices , mem . Localpart )
207
220
if err != nil {
208
221
log .WithFields (log.Fields {
209
222
"event_id" : event .EventID (),
210
- "localpart" : localpart ,
223
+ "localpart" : mem . Localpart ,
211
224
}).WithError (err ).Errorf ("Unable to notify HTTP pusher" )
212
225
continue
213
226
}
214
227
rejected = append (rejected , rej ... )
215
228
}
216
229
217
230
if len (rejected ) > 0 {
218
- return s .deleteRejectedPushers (ctx , rejected , localpart )
231
+ return s .deleteRejectedPushers (ctx , rejected , mem . Localpart )
219
232
}
220
233
221
234
return nil
222
235
}
223
236
224
237
// evaluatePushRules fetches and evaluates the push rules of a local
225
238
// user. Returns true if the event should be pushed.
226
- func (s * OutputRoomEventConsumer ) evaluatePushRules (ctx context.Context , event * gomatrixserverlib.HeaderedEvent , localpart string ) (ok bool , tweaks map [string ]interface {}, err error ) {
227
- // TODO: evaluate push rules
228
- return true , nil , nil
239
+ func (s * OutputRoomEventConsumer ) evaluatePushRules (ctx context.Context , event * gomatrixserverlib.HeaderedEvent , mem * localMembership , roomSize int ) (bool , map [string ]interface {}, error ) {
240
+ if event .Sender () == mem .UserID {
241
+ // SPEC: Homeservers MUST NOT notify the Push Gateway for
242
+ // events that the user has sent themselves.
243
+ return false , nil , nil
244
+ }
245
+
246
+ // TODO: fetch the user's push rules.
247
+ ruleSet := pushrules .DefaultRuleSet (mem .Localpart , s .cfg .Matrix .ServerName )
248
+
249
+ ec := & ruleSetEvalContext {
250
+ ctx : ctx ,
251
+ rsAPI : s .rsAPI ,
252
+ mem : mem ,
253
+ roomID : event .RoomID (),
254
+ roomSize : roomSize ,
255
+ }
256
+ eval := pushrules .NewRuleSetEvaluator (ec , ruleSet )
257
+ rule , err := eval .MatchEvent (event .Event )
258
+ if err != nil {
259
+ return false , nil , err
260
+ }
261
+ if rule == nil {
262
+ // SPEC: If no rules match an event, the homeserver MUST NOT
263
+ // notify the Push Gateway for that event.
264
+ return false , nil , err
265
+ }
266
+
267
+ log .WithFields (log.Fields {
268
+ "room_id" : event .RoomID (),
269
+ "localpart" : mem .Localpart ,
270
+ "rule_id" : rule .RuleID ,
271
+ }).Infof ("Matched a push rule" )
272
+
273
+ a , tweaks , err := pushrules .ActionsToTweaks (rule .Actions )
274
+ if err != nil {
275
+ return false , nil , err
276
+ }
277
+
278
+ // TODO: support coalescing.
279
+ return a == pushrules .NotifyAction || a == pushrules .CoalesceAction , tweaks , nil
280
+ }
281
+
282
+ type ruleSetEvalContext struct {
283
+ ctx context.Context
284
+ rsAPI rsapi.RoomserverInternalAPI
285
+ mem * localMembership
286
+ roomID string
287
+ roomSize int
288
+ }
289
+
290
+ func (rse * ruleSetEvalContext ) UserDisplayName () string { return rse .mem .DisplayName }
291
+
292
+ func (rse * ruleSetEvalContext ) RoomMemberCount () (int , error ) { return rse .roomSize , nil }
293
+
294
+ func (rse * ruleSetEvalContext ) HasPowerLevel (userID , levelKey string ) (bool , error ) {
295
+ req := & rsapi.QueryLatestEventsAndStateRequest {
296
+ RoomID : rse .roomID ,
297
+ StateToFetch : []gomatrixserverlib.StateKeyTuple {
298
+ {EventType : "m.room.power_levels" },
299
+ },
300
+ }
301
+ var res rsapi.QueryLatestEventsAndStateResponse
302
+ if err := rse .rsAPI .QueryLatestEventsAndState (rse .ctx , req , & res ); err != nil {
303
+ return false , err
304
+ }
305
+ for _ , ev := range res .StateEvents {
306
+ if ev .Type () != gomatrixserverlib .MRoomPowerLevels {
307
+ continue
308
+ }
309
+
310
+ plc , err := gomatrixserverlib .NewPowerLevelContentFromEvent (ev .Event )
311
+ if err != nil {
312
+ return false , err
313
+ }
314
+ return plc .UserLevel (userID ) >= plc .NotificationLevel (levelKey ), nil
315
+ }
316
+ return true , nil
229
317
}
230
318
231
319
// localPushDevices pushes to the configured devices of a local user.
0 commit comments