Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement /_matrix/client/v1/rooms/{roomId}/threads #3404

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions syncapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ func Setup(
}, httputil.WithAllowGuests()),
).Methods(http.MethodGet, http.MethodOptions)

v1unstablemux.Handle("/rooms/{roomId}/threads",
httputil.MakeAuthAPI("threads", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}

return Threads(
req, device, syncDB, rsAPI, vars["roomId"],
)
})).Methods(http.MethodGet)

v3mux.Handle("/search",
httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if !cfg.Fulltext.Enabled {
Expand Down Expand Up @@ -200,4 +212,5 @@ func Setup(
return GetMemberships(req, device, vars["roomID"], syncDB, rsAPI, membership, notMembership, at)
}, httputil.WithAllowGuests()),
).Methods(http.MethodGet, http.MethodOptions)

}
107 changes: 107 additions & 0 deletions syncapi/routing/threads.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs to be formatted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package routing

import (
rstypes "github.com/matrix-org/dendrite/roomserver/types"
"net/http"
"strconv"

"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

type ThreadsResponse struct {
Chunk []synctypes.ClientEvent `json:"chunk"`
NextBatch string `json:"next_batch,omitempty"`
}

func Threads(
req *http.Request,
device *userapi.Device,
syncDB storage.Database,
rsAPI api.SyncRoomserverAPI,
rawRoomID string) util.JSONResponse {
var err error
roomID, err := spec.NewRoomID(rawRoomID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam("invalid room ID"),
}
}

limit, err := strconv.ParseUint(req.URL.Query().Get("limit"), 10, 64)
if err != nil {
limit = 50
}
if limit > 100 {
limit = 100
}

var from types.StreamPosition
if f := req.URL.Query().Get("from"); f != "" {
if from, err = types.NewStreamPositionFromString(f); err != nil {
return util.ErrorResponse(err)
}
}

include := req.URL.Query().Get("include")

snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
if err != nil {
logrus.WithError(err).Error("Failed to get snapshot for relations")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}
var succeeded bool
defer sqlutil.EndTransactionWithCheck(snapshot, &succeeded, &err)

res := &ThreadsResponse{
Chunk: []synctypes.ClientEvent{},
}

var userID string
if include == "participated" {
_, err := spec.NewUserID(device.UserID, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, err := spec.NewUserID(device.UserID, true)
_, err = spec.NewUserID(device.UserID, true)

if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("device.UserID invalid")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.Unknown("internal server error"),
}
}
userID = device.UserID
} else {
userID = ""
}
var headeredEvents []*rstypes.HeaderedEvent
headeredEvents, _, res.NextBatch, err = snapshot.ThreadsFor(
req.Context(), roomID.String(), userID, from, limit,
)
if err != nil {
return util.ErrorResponse(err)
}

for _, event := range headeredEvents {
ce, err := synctypes.ToClientEvent(event, synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the roomID is not needed, as we're already requesting for a specific room?

Suggested change
ce, err := synctypes.ToClientEvent(event, synctypes.FormatAll, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
ce, err := synctypes.ToClientEvent(event, synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {

return rsAPI.QueryUserIDForSender(req.Context(), roomID, senderID)
})
if err != nil {
return util.ErrorResponse(err)
}
res.Chunk = append(res.Chunk, *ce)
}

return util.JSONResponse{
Code: http.StatusOK,
JSON: res,
}
}
64 changes: 64 additions & 0 deletions syncapi/storage/postgres/relations_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,32 @@ const selectRelationsInRangeDescSQL = "" +
" AND id >= $5 AND id < $6" +
" ORDER BY id DESC LIMIT $7"

const selectThreadsSQL = "" +
"SELECT syncapi_relations.id, syncapi_relations.event_id FROM syncapi_relations" +
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
" WHERE syncapi_relations.room_id = $1" +
" AND syncapi_relations.rel_type = 'm.thread'" +
" AND syncapi_relations.id >= $2" +
" ORDER BY syncapi_relations.id LIMIT $3"

const selectThreadsWithSenderSQL = "" +
"SELECT syncapi_relations.id, syncapi_relations.event_id FROM syncapi_relations" +
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
" WHERE syncapi_relations.room_id = $1" +
" AND syncapi_output_room_events.sender = $2" +
" AND syncapi_relations.rel_type = 'm.thread'" +
" AND syncapi_relations.id >= $3" +
" ORDER BY syncapi_relations.id LIMIT $4"

const selectMaxRelationIDSQL = "" +
"SELECT COALESCE(MAX(id), 0) FROM syncapi_relations"

type relationsStatements struct {
insertRelationStmt *sql.Stmt
selectRelationsInRangeAscStmt *sql.Stmt
selectRelationsInRangeDescStmt *sql.Stmt
selectThreadsStmt *sql.Stmt
selectThreadsWithSenderStmt *sql.Stmt
deleteRelationStmt *sql.Stmt
selectMaxRelationIDStmt *sql.Stmt
}
Expand All @@ -84,6 +103,8 @@ func NewPostgresRelationsTable(db *sql.DB) (tables.Relations, error) {
{&s.insertRelationStmt, insertRelationSQL},
{&s.selectRelationsInRangeAscStmt, selectRelationsInRangeAscSQL},
{&s.selectRelationsInRangeDescStmt, selectRelationsInRangeDescSQL},
{&s.selectThreadsStmt, selectThreadsSQL},
{&s.selectThreadsWithSenderStmt, selectThreadsWithSenderSQL},
{&s.deleteRelationStmt, deleteRelationSQL},
{&s.selectMaxRelationIDStmt, selectMaxRelationIDSQL},
}.Prepare(db)
Expand Down Expand Up @@ -149,6 +170,49 @@ func (s *relationsStatements) SelectRelationsInRange(
return result, lastPos, rows.Err()
}

func (s *relationsStatements) SelectThreads(
ctx context.Context,
txn *sql.Tx,
roomID, userID string,
from types.StreamPosition,
limit uint64,
) ([]string, types.StreamPosition, error) {
var lastPos types.StreamPosition
var stmt *sql.Stmt
var rows *sql.Rows
var err error

if userID == "" {
stmt = sqlutil.TxStmt(txn, s.selectThreadsStmt)
rows, err = stmt.QueryContext(ctx, roomID, from, limit)
} else {
stmt = sqlutil.TxStmt(txn, s.selectThreadsWithSenderStmt)
rows, err = stmt.QueryContext(ctx, roomID, userID, from, limit)
}
if err != nil {
return nil, lastPos, err
}

defer internal.CloseAndLogIfError(ctx, rows, "selectThreads: rows.close() failed")
var result []string
var (
id types.StreamPosition
eventId string
)

for rows.Next() {
if err = rows.Scan(&id, &eventId); err != nil {
return nil, lastPos, err
}
if id > lastPos {
lastPos = id
}
result = append(result, eventId)
}

return result, lastPos, rows.Err()
}

func (s *relationsStatements) SelectMaxRelationID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
Expand Down
36 changes: 36 additions & 0 deletions syncapi/storage/shared/storage_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,39 @@ func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID,

return events, prevBatch, nextBatch, nil
}

func (d *DatabaseTransaction) ThreadsFor(ctx context.Context, roomID, userID string, from types.StreamPosition, limit uint64) (
events []*rstypes.HeaderedEvent, prevBatch, nextBatch string, err error,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prevBatch is always empty, so can be removed.

) {
r := types.Range{
From: from,
}

if r.From == 0 {
// If we're working backwards (dir=b) and there's no ?from= specified then
// we will automatically want to work backwards from the current position,
// so find out what that is.
if r.From, err = d.MaxStreamPositionForRelations(ctx); err != nil {
return nil, "", "", fmt.Errorf("d.MaxStreamPositionForRelations: %w", err)
}
// The result normally isn't inclusive of the event *at* the ?from=
// position, so add 1 here so that we include the most recent relation.
r.From++
}

// First look up any threads from the database. We add one to the limit here
// so that we can tell if we're overflowing, as we will only set the "next_batch"
// in the response if we are.
eventIDs, pos, err := d.Relations.SelectThreads(ctx, d.txn, roomID, userID, from, limit+1)

if err != nil {
return nil, "", "", fmt.Errorf("d.Relations.SelectRelationsInRange: %w", err)
}

events, err = d.Events(ctx, eventIDs)
if err != nil {
return nil, "", "", fmt.Errorf("d.OutputEvents.SelectEvents: %w", err)
}

return events, prevBatch, fmt.Sprintf("%d", pos), nil
}
64 changes: 64 additions & 0 deletions syncapi/storage/sqlite3/relations_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,30 @@ const selectRelationsInRangeDescSQL = "" +
const selectMaxRelationIDSQL = "" +
"SELECT COALESCE(MAX(id), 0) FROM syncapi_relations"

const selectThreadsSQL = "" +
"SELECT syncapi_relations.id, syncapi_relations.event_id FROM syncapi_relations" +
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
" WHERE syncapi_relations.room_id = $1" +
" AND syncapi_relations.rel_type = 'm.thread'" +
" AND syncapi_relations.id >= $2" +
" ORDER BY syncapi_relations.id LIMIT $3"

const selectThreadsWithSenderSQL = "" +
"SELECT syncapi_relations.id, syncapi_relations.event_id FROM syncapi_relations" +
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_relations.event_id" +
" WHERE syncapi_relations.room_id = $1" +
" AND syncapi_output_room_events.sender = $2" +
" AND syncapi_relations.rel_type = 'm.thread'" +
" AND syncapi_relations.id >= $3" +
" ORDER BY syncapi_relations.id LIMIT $4"

type relationsStatements struct {
streamIDStatements *StreamIDStatements
insertRelationStmt *sql.Stmt
selectRelationsInRangeAscStmt *sql.Stmt
selectRelationsInRangeDescStmt *sql.Stmt
selectThreadsStmt *sql.Stmt
selectThreadsWithSenderStmt *sql.Stmt
deleteRelationStmt *sql.Stmt
selectMaxRelationIDStmt *sql.Stmt
}
Expand All @@ -85,6 +104,8 @@ func NewSqliteRelationsTable(db *sql.DB, streamID *StreamIDStatements) (tables.R
{&s.insertRelationStmt, insertRelationSQL},
{&s.selectRelationsInRangeAscStmt, selectRelationsInRangeAscSQL},
{&s.selectRelationsInRangeDescStmt, selectRelationsInRangeDescSQL},
{&s.selectThreadsStmt, selectThreadsSQL},
{&s.selectThreadsWithSenderStmt, selectThreadsWithSenderSQL},
{&s.deleteRelationStmt, deleteRelationSQL},
{&s.selectMaxRelationIDStmt, selectMaxRelationIDSQL},
}.Prepare(db)
Expand Down Expand Up @@ -154,6 +175,49 @@ func (s *relationsStatements) SelectRelationsInRange(
return result, lastPos, rows.Err()
}

func (s *relationsStatements) SelectThreads(
ctx context.Context,
txn *sql.Tx,
roomID, userID string,
from types.StreamPosition,
limit uint64,
) ([]string, types.StreamPosition, error) {
var lastPos types.StreamPosition
var stmt *sql.Stmt
var rows *sql.Rows
var err error

if userID == "" {
stmt = sqlutil.TxStmt(txn, s.selectThreadsStmt)
rows, err = stmt.QueryContext(ctx, roomID, from, limit)
} else {
stmt = sqlutil.TxStmt(txn, s.selectThreadsWithSenderStmt)
rows, err = stmt.QueryContext(ctx, roomID, userID, from, limit)
}
if err != nil {
return nil, lastPos, err
}

defer internal.CloseAndLogIfError(ctx, rows, "selectThreads: rows.close() failed")
var result []string
var (
id types.StreamPosition
eventId string
)

for rows.Next() {
if err = rows.Scan(&id, &eventId); err != nil {
return nil, lastPos, err
}
if id > lastPos {
lastPos = id
}
result = append(result, eventId)
}

return result, lastPos, rows.Err()
}

func (s *relationsStatements) SelectMaxRelationID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
Expand Down
7 changes: 5 additions & 2 deletions syncapi/storage/tables/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ type Presence interface {
}

type Relations interface {
// Inserts a relation which refers from the child event ID to the event ID in the given room.
// InsertRelation Inserts a relation which refers from the child event ID to the event ID in the given room.
// If the relation already exists then this function will do nothing and return no error.
InsertRelation(ctx context.Context, txn *sql.Tx, roomID, eventID, childEventID, childEventType, relType string) (err error)
// Deletes a relation which already exists as the result of an event redaction. If the relation
// DeleteRelation Deletes a relation which already exists as the result of an event redaction. If the relation
// does not exist then this function will do nothing and return no error.
DeleteRelation(ctx context.Context, txn *sql.Tx, roomID, childEventID string) error
// SelectRelationsInRange will return relations grouped by relation type within the given range.
Expand All @@ -235,6 +235,9 @@ type Relations interface {
// will be returned, inclusive of the "to" position but excluding the "from" position. The stream
// position returned is the maximum position of the returned results.
SelectRelationsInRange(ctx context.Context, txn *sql.Tx, roomID, eventID, relType, eventType string, r types.Range, limit int) (map[string][]types.RelationEntry, types.StreamPosition, error)
// SelectThreads will find threads from a room, if userID is not empty
// then it will only include the threads that the user has participated in.
SelectThreads(ctx context.Context, txn *sql.Tx, roomID, userID string, from types.StreamPosition, limit uint64) ([]string, types.StreamPosition, error)
// SelectMaxRelationID returns the maximum ID of all relations, used to determine what the boundaries
// should be if there are no boundaries supplied (i.e. we want to work backwards but don't have a
// "from" or want to work forwards and don't have a "to").
Expand Down
Loading