Skip to content

Commit 3680e80

Browse files
committed
few more optimization
1 parent af34eac commit 3680e80

15 files changed

+115
-94
lines changed

pkg/controllers/auth_token.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/mynaparrot/plugnmeet-server/pkg/models"
1414
"github.com/mynaparrot/plugnmeet-server/pkg/services/db"
1515
natsservice "github.com/mynaparrot/plugnmeet-server/pkg/services/nats"
16-
"github.com/mynaparrot/plugnmeet-server/pkg/services/redis"
1716
"github.com/mynaparrot/plugnmeet-server/version"
1817
"google.golang.org/protobuf/encoding/protojson"
1918
"google.golang.org/protobuf/proto"
@@ -79,8 +78,8 @@ func HandleGenerateJoinToken(c *fiber.Ctx) error {
7978
}
8079

8180
// don't generate token if user is blocked
82-
rs := redisservice.New(config.GetConfig().RDS)
83-
exist := rs.IsUserExistInBlockList(req.RoomId, req.UserInfo.UserId)
81+
nts := natsservice.New(config.GetConfig())
82+
exist := nts.IsUserExistInBlockList(req.RoomId, req.UserInfo.UserId)
8483
if exist {
8584
return utils.SendCommonProtoJsonResponse(c, false, "this user is blocked to join this session")
8685
}
@@ -130,8 +129,7 @@ func HandleVerifyToken(c *fiber.Ctx) error {
130129
}
131130
}
132131

133-
rs := redisservice.New(app.RDS)
134-
exist := rs.IsUserExistInBlockList(roomId.(string), requestedUserId.(string))
132+
exist := nts.IsUserExistInBlockList(roomId.(string), requestedUserId.(string))
135133
if exist {
136134
return utils.SendCommonProtobufResponse(c, false, "notifications.you-are-blocked")
137135
}

pkg/controllers/bbb_api_wrapper.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func HandleBBBJoin(c *fiber.Ctx) error {
233233
return c.XML(bbbapiwrapper.CommonResponseMsg("FAILED", "validationError", err.Error()))
234234
}
235235

236-
exist := rs.IsUserExistInBlockList(req.RoomId, req.UserInfo.UserId)
236+
exist := nts.IsUserExistInBlockList(req.RoomId, req.UserInfo.UserId)
237237
if exist {
238238
return c.XML(bbbapiwrapper.CommonResponseMsg("FAILED", "validationError", "this user is blocked to join this session"))
239239
}

pkg/helpers/webhook_notifier.go

+16-13
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/mynaparrot/plugnmeet-protocol/webhook"
77
"github.com/mynaparrot/plugnmeet-server/pkg/config"
88
"github.com/mynaparrot/plugnmeet-server/pkg/services/db"
9+
natsservice "github.com/mynaparrot/plugnmeet-server/pkg/services/nats"
910
"github.com/mynaparrot/plugnmeet-server/pkg/services/redis"
1011
log "github.com/sirupsen/logrus"
1112
"time"
@@ -15,6 +16,7 @@ type WebhookNotifier struct {
1516
ds *dbservice.DatabaseService
1617
rs *redisservice.RedisService
1718
app *config.AppConfig
19+
natsService *natsservice.NatsService
1820
isEnabled bool
1921
enabledForPerMeeting bool
2022
defaultUrl string
@@ -26,13 +28,13 @@ type webhookRedisFields struct {
2628
PerformDeleting bool `json:"perform_deleting"`
2729
}
2830

29-
func newWebhookNotifier(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redisservice.RedisService) *WebhookNotifier {
31+
func newWebhookNotifier(app *config.AppConfig) *WebhookNotifier {
3032
notifier := webhook.GetWebhookNotifier(config.DefaultWebhookQueueSize, app.Client.Debug, config.GetLogger())
3133

3234
w := &WebhookNotifier{
33-
ds: ds,
34-
rs: rs,
3535
app: app,
36+
ds: dbservice.New(app.DB),
37+
natsService: natsservice.New(app),
3638
isEnabled: app.Client.WebhookConf.Enable,
3739
enabledForPerMeeting: app.Client.WebhookConf.EnableForPerMeeting,
3840
defaultUrl: app.Client.WebhookConf.Url,
@@ -84,16 +86,16 @@ func (w *WebhookNotifier) DeleteWebhook(roomId string) error {
8486
return err
8587
}
8688
if d == nil {
87-
// this meeting do not have any webhook url
89+
// this meeting does not have any webhook url
8890
return nil
8991
}
9092

9193
if !d.PerformDeleting {
92-
// this mean may be new session has been started for same room
94+
// this mean may be new session has been started for the same room
9395
return nil
9496
}
9597

96-
return w.rs.DeleteWebhookData(roomId)
98+
return w.natsService.DeleteWebhookData(roomId)
9799
}
98100

99101
func (w *WebhookNotifier) SendWebhookEvent(event *plugnmeet.CommonNotifyEvent) error {
@@ -135,7 +137,8 @@ func (w *WebhookNotifier) SendWebhookEvent(event *plugnmeet.CommonNotifyEvent) e
135137
}
136138

137139
// ForceToPutInQueue can be used to force checking meeting table to get url
138-
// this method will not do further validation. We should not use this method always because fetching data mysql will be slower than redis
140+
// this method will not do further validation.
141+
// We should not use this method always because fetching data mysql will be slower than redis
139142
func (w *WebhookNotifier) ForceToPutInQueue(event *plugnmeet.CommonNotifyEvent) {
140143
if !w.isEnabled {
141144
return
@@ -171,7 +174,7 @@ func (w *WebhookNotifier) saveData(roomId string, d *webhookRedisFields) error {
171174
}
172175

173176
// we'll simply override any existing value & put new
174-
err = w.rs.AddWebhookData(roomId, marshal)
177+
err = w.natsService.AddWebhookData(roomId, marshal)
175178
if err != nil {
176179
return err
177180
}
@@ -180,17 +183,17 @@ func (w *WebhookNotifier) saveData(roomId string, d *webhookRedisFields) error {
180183
}
181184

182185
func (w *WebhookNotifier) getData(roomId string) (*webhookRedisFields, error) {
183-
result, err := w.rs.GetWebhookData(roomId)
186+
data, err := w.natsService.GetWebhookData(roomId)
184187
if err != nil {
185188
return nil, err
186189
}
187190

188-
if result == "" {
191+
if data == nil {
189192
return nil, nil
190193
}
191194

192195
d := new(webhookRedisFields)
193-
err = json.Unmarshal([]byte(result), d)
196+
err = json.Unmarshal(data, d)
194197
if err != nil {
195198
return nil, err
196199
}
@@ -200,11 +203,11 @@ func (w *WebhookNotifier) getData(roomId string) (*webhookRedisFields, error) {
200203

201204
var webhookNotifier *WebhookNotifier
202205

203-
func GetWebhookNotifier(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redisservice.RedisService) *WebhookNotifier {
206+
func GetWebhookNotifier(app *config.AppConfig) *WebhookNotifier {
204207
if webhookNotifier != nil {
205208
return webhookNotifier
206209
}
207-
webhookNotifier = newWebhookNotifier(app, ds, rs)
210+
webhookNotifier = newWebhookNotifier(app)
208211

209212
return webhookNotifier
210213
}

pkg/models/analytics_export.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (m *AnalyticsModel) buildEventInfo(ekey string, eventInfo *plugnmeet.Analyt
255255
}
256256

257257
func (m *AnalyticsModel) sendToWebhookNotifier(roomId, roomSid, task, fileId string) {
258-
n := helpers.GetWebhookNotifier(m.app, m.ds, m.rs)
258+
n := helpers.GetWebhookNotifier(m.app)
259259
if n != nil {
260260
msg := &plugnmeet.CommonNotifyEvent{
261261
Event: &task,

pkg/models/recording.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func NewRecordingModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs
3838
ds: ds,
3939
rs: rs,
4040
analyticsModel: NewAnalyticsModel(app, ds, rs),
41-
webhookNotifier: helpers.GetWebhookNotifier(app, ds, rs),
41+
webhookNotifier: helpers.GetWebhookNotifier(app),
4242
natsService: natsservice.New(app),
4343
}
4444
}

pkg/models/room_end.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,7 @@ func (m *RoomModel) OnAfterRoomEnded(roomId, roomSid, metadata string) {
6969
// now we'll perform a few service related tasks
7070
time.Sleep(config.WaitBeforeTriggerOnAfterRoomEnded)
7171
// delete blocked users list
72-
_, err = m.rs.DeleteRoomBlockList(roomId)
73-
if err != nil {
74-
log.Errorln(err)
75-
}
72+
m.natsService.DeleteRoomUsersBlockList(roomId)
7673

7774
// remove from progress, if existed. no need to log if error
7875
_, _ = m.rs.RoomCreationProgressList(roomId, "del")

pkg/models/speechtotext.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func NewSpeechToTextModel(app *config.AppConfig, ds *dbservice.DatabaseService,
3737
ds: ds,
3838
rs: rs,
3939
analyticsModel: NewAnalyticsModel(app, ds, rs),
40-
webhookNotifier: helpers.GetWebhookNotifier(app, ds, rs),
40+
webhookNotifier: helpers.GetWebhookNotifier(app),
4141
natsService: natsservice.New(app),
4242
}
4343
}

pkg/models/user_update.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (m *UserModel) RemoveParticipant(r *plugnmeet.RemoveParticipantReq) error {
3131

3232
// finally, check if requested to block as well as
3333
if r.BlockUser {
34-
_, _ = m.rs.AddUserToBlockList(r.RoomId, r.UserId)
34+
_, _ = m.natsService.AddUserToBlockList(r.RoomId, r.UserId)
3535
}
3636

3737
return nil

pkg/models/webhook.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func NewWebhookModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs *r
4444
lk: lk,
4545
rm: NewRoomModel(app, ds, rs, lk),
4646
analyticsModel: NewAnalyticsModel(app, ds, rs),
47-
webhookNotifier: helpers.GetWebhookNotifier(app, ds, rs),
47+
webhookNotifier: helpers.GetWebhookNotifier(app),
4848
natsService: natsservice.New(app),
4949
}
5050
}

pkg/models/webhook_room_end.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (m *WebhookModel) roomFinished(event *livekit.WebhookEvent) {
5555
m.sendToWebhookNotifier(event)
5656

5757
// now clean up webhook for this room
58-
err = m.webhookNotifier.DeleteWebhook(rInfo.RoomSid)
58+
err = m.webhookNotifier.DeleteWebhook(rInfo.RoomId)
5959
if err != nil {
6060
log.Errorln(err)
6161
}

pkg/services/nats/user_info.go

+15
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,18 @@ func (s *NatsService) HasOnlineUser(roomId string) bool {
239239

240240
return false
241241
}
242+
243+
func (s *NatsService) IsUserExistInBlockList(roomId, userId string) bool {
244+
kv, err := s.js.KeyValue(s.ctx, fmt.Sprintf(RoomUsersBlockList, roomId))
245+
if err != nil {
246+
return false
247+
}
248+
get, err := kv.Get(s.ctx, userId)
249+
if err != nil {
250+
return false
251+
}
252+
if get != nil {
253+
return true
254+
}
255+
return false
256+
}

pkg/services/nats/user_modify.go

+16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
userInfoBucketPrefix = Prefix + "userInfo-"
1818
UserInfoBucket = userInfoBucketPrefix + "r_%s-u_%s"
1919

20+
RoomUsersBlockList = Prefix + "usersBlockList-%s"
21+
2022
UserOnlineMaxPingDiff = time.Second * 30 // after 30 seconds we'll treat user as offline
2123

2224
UserIdKey = "id"
@@ -215,3 +217,17 @@ func (s *NatsService) UpdateUserKeyValue(roomId, userId, key, val string) error
215217

216218
return nil
217219
}
220+
221+
func (s *NatsService) AddUserToBlockList(roomId, userId string) (uint64, error) {
222+
kv, err := s.js.CreateOrUpdateKeyValue(s.ctx, jetstream.KeyValueConfig{
223+
Bucket: fmt.Sprintf(RoomUsersBlockList, roomId),
224+
})
225+
if err != nil {
226+
return 0, err
227+
}
228+
return kv.PutString(s.ctx, userId, fmt.Sprintf("%d", time.Now().UnixMilli()))
229+
}
230+
231+
func (s *NatsService) DeleteRoomUsersBlockList(roomId string) {
232+
_ = s.js.DeleteKeyValue(s.ctx, fmt.Sprintf(RoomUsersBlockList, roomId))
233+
}

pkg/services/nats/webhook.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package natsservice
2+
3+
import (
4+
"errors"
5+
"github.com/nats-io/nats.go/jetstream"
6+
)
7+
8+
const WebhookKvKey = Prefix + "webhookData"
9+
10+
func (s *NatsService) AddWebhookData(roomId string, val []byte) error {
11+
kv, err := s.js.CreateOrUpdateKeyValue(s.ctx, jetstream.KeyValueConfig{
12+
Bucket: WebhookKvKey,
13+
})
14+
if err != nil {
15+
return err
16+
}
17+
18+
_, err = kv.Put(s.ctx, roomId, val)
19+
if err != nil {
20+
return err
21+
}
22+
23+
return nil
24+
}
25+
26+
func (s *NatsService) GetWebhookData(roomId string) ([]byte, error) {
27+
kv, err := s.js.KeyValue(s.ctx, WebhookKvKey)
28+
switch {
29+
case errors.Is(err, jetstream.ErrBucketNotFound):
30+
return nil, nil
31+
case err != nil:
32+
return nil, err
33+
}
34+
35+
entry, err := kv.Get(s.ctx, roomId)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
if entry == nil {
41+
return nil, nil
42+
}
43+
44+
return entry.Value(), nil
45+
}
46+
47+
func (s *NatsService) DeleteWebhookData(roomId string) error {
48+
kv, err := s.js.KeyValue(s.ctx, WebhookKvKey)
49+
switch {
50+
case errors.Is(err, jetstream.ErrBucketNotFound):
51+
return nil
52+
case err != nil:
53+
return err
54+
}
55+
56+
return kv.Delete(s.ctx, roomId)
57+
}

pkg/services/redis/user.go

-27
This file was deleted.

pkg/services/redis/webhook_notifier.go

-38
This file was deleted.

0 commit comments

Comments
 (0)