Skip to content

Commit

Permalink
Merge pull request #359 from matrix-org/dmr/state-redaction-payload
Browse files Browse the repository at this point in the history
Add a separate payload for redacting state
  • Loading branch information
David Robertson authored Nov 2, 2023
2 parents 759146e + f595aed commit 0e8d4a8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 12 deletions.
15 changes: 14 additions & 1 deletion pubsub/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type V2Listener interface {
OnDeviceMessages(p *V2DeviceMessages)
OnExpiredToken(p *V2ExpiredToken)
OnInvalidateRoom(p *V2InvalidateRoom)
OnStateRedaction(p *V2StateRedaction)
}

type V2Initialise struct {
Expand Down Expand Up @@ -130,7 +131,17 @@ type V2ExpiredToken struct {

func (*V2ExpiredToken) Type() string { return "V2ExpiredToken" }

// V2InvalidateRoom is emitted after a non-incremental state change to a room.
// V2StateRedaction is emitted when a timeline is seen that contains one or more
// redaction events targeting a piece of room state. The redaction will be emitted
// before its corresponding V2Accumulate payload is emitted.
type V2StateRedaction struct {
RoomID string
}

func (*V2StateRedaction) Type() string { return "V2StateRedaction" }

// V2InvalidateRoom is emitted after a non-incremental state change to a room, in place
// of a V2Initialise payload.
type V2InvalidateRoom struct {
RoomID string
}
Expand Down Expand Up @@ -183,6 +194,8 @@ func (v *V2Sub) onMessage(p Payload) {
v.receiver.OnExpiredToken(pl)
case *V2InvalidateRoom:
v.receiver.OnInvalidateRoom(pl)
case *V2StateRedaction:
v.receiver.OnStateRedaction(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
}
Expand Down
9 changes: 4 additions & 5 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,9 @@ type AccumulateResult struct {
// TimelineNIDs is the list of event nids seen in a sync v2 timeline. Some of these
// may already be known to the proxy.
TimelineNIDs []int64
// RequiresReload is set to true when we have accumulated a non-incremental state
// change (typically a redaction) that requires consumers to reload the room state
// from the latest snapshot.
RequiresReload bool
// IncludesStateRedaction is set to true when we have accumulated a redaction to a
// piece of room state.
IncludesStateRedaction bool
}

// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
Expand Down Expand Up @@ -546,7 +545,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
if err != nil {
return AccumulateResult{}, err
}
result.RequiresReload = currentStateRedactions > 0
result.IncludesStateRedaction = currentStateRedactions > 0
}

if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, postInsertEvents); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
t.Log("We expect 3 new events and no reload required.")
assertValue(t, "accResult.NumNew", accResult.NumNew, 3)
assertValue(t, "len(accResult.TimelineNIDs)", len(accResult.TimelineNIDs), 3)
assertValue(t, "accResult.RequiresReload", accResult.RequiresReload, false)
assertValue(t, "accResult.IncludesStateRedaction", accResult.IncludesStateRedaction, false)

t.Log("Redact the old state event and the message.")
timeline = []json.RawMessage{
Expand All @@ -274,7 +274,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
t.Log("We expect 2 new events and no reload required.")
assertValue(t, "accResult.NumNew", accResult.NumNew, 2)
assertValue(t, "len(accResult.TimelineNIDs)", len(accResult.TimelineNIDs), 2)
assertValue(t, "accResult.RequiresReload", accResult.RequiresReload, false)
assertValue(t, "accResult.IncludesStateRedaction", accResult.IncludesStateRedaction, false)

t.Log("Redact the latest state event.")
timeline = []json.RawMessage{
Expand All @@ -291,7 +291,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
t.Log("We expect 1 new event and a reload required.")
assertValue(t, "accResult.NumNew", accResult.NumNew, 1)
assertValue(t, "len(accResult.TimelineNIDs)", len(accResult.TimelineNIDs), 1)
assertValue(t, "accResult.RequiresReload", accResult.RequiresReload, true)
assertValue(t, "accResult.IncludesStateRedaction", accResult.IncludesStateRedaction, true)
}

func TestAccumulatorMembershipLogs(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin
return err
}

// Consumers should reload state before processing new timeline events.
if accResult.RequiresReload {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InvalidateRoom{
// Consumers should reload state content before processing new timeline events.
if accResult.IncludesStateRedaction {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2StateRedaction{
RoomID: roomID,
})
}
Expand Down
8 changes: 8 additions & 0 deletions sync3/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,14 @@ func (h *SyncLiveHandler) OnExpiredToken(p *pubsub.V2ExpiredToken) {
h.ConnMap.CloseConnsForDevice(p.UserID, p.DeviceID)
}

func (h *SyncLiveHandler) OnStateRedaction(p *pubsub.V2StateRedaction) {
// We only need to reload the global metadata here: mercifully, there isn't anything
// in the user cache that needs to be reloaded after state gets redacted.
ctx, task := internal.StartTask(context.Background(), "OnStateRedaction")
defer task.End()
h.GlobalCache.OnInvalidateRoom(ctx, p.RoomID)
}

func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) {
ctx, task := internal.StartTask(context.Background(), "OnInvalidateRoom")
defer task.End()
Expand Down

0 comments on commit 0e8d4a8

Please sign in to comment.