Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Use PgSQL locker implementation (#201)
Browse files Browse the repository at this point in the history
* use PgSQL locker implementation

* updated CHANGELOG.md
  • Loading branch information
ortuman authored Jan 26, 2022
1 parent 92c2042 commit 7097fb4
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 188 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
## jackal - main / unreleased

* [ENHANCEMENT] Added memory ballast. #198
* [CHANGE] Introduced measured repository transaction type. #200
* [CHANGE] Introduced measured repository transaction type. #200
* [CHANGE] Use PgSQL locker. #201
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
83 changes: 0 additions & 83 deletions pkg/cluster/etcd/locker.go

This file was deleted.

11 changes: 1 addition & 10 deletions pkg/jackal/jackal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager"
"github.com/ortuman/jackal/pkg/cluster/etcd"
"github.com/ortuman/jackal/pkg/cluster/kv"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/cluster/memberlist"
clusterrouter "github.com/ortuman/jackal/pkg/cluster/router"
clusterserver "github.com/ortuman/jackal/pkg/cluster/server"
Expand Down Expand Up @@ -105,8 +104,7 @@ type Jackal struct {
peppers *pepper.Keys
hk *hook.Hooks

locker locker.Locker
kv kv.KV
kv kv.KV

rep repository.Repository
memberList *memberlist.MemberList
Expand Down Expand Up @@ -138,7 +136,6 @@ func New(output io.Writer, args []string) *Jackal {
output: output,
args: args,
waitStopCh: make(chan os.Signal, 1),
locker: locker.NewNopLocker(),
kv: kv.NewNopKV(),
}
}
Expand Down Expand Up @@ -223,7 +220,6 @@ func (j *Jackal) Run() error {
if err := j.checkEtcdHealth(cfg.Cluster.Etcd.Endpoints); err != nil {
return err
}
j.initLocker(cfg.Cluster.Etcd)
j.initKVStore(cfg.Cluster.Etcd)
}

Expand Down Expand Up @@ -307,11 +303,6 @@ func (j *Jackal) checkEtcdHealth(endpoints []string) error {
return nil
}

func (j *Jackal) initLocker(cfg etcd.Config) {
j.locker = etcd.NewLocker(cfg, j.logger)
j.registerStartStopper(j.locker)
}

func (j *Jackal) initKVStore(cfg etcd.Config) {
etcdKV := etcd.NewKV(cfg, j.logger)
j.kv = kv.NewMeasured(etcdKV)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jackal/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{
// Offline
// (https://xmpp.org/extensions/xep-0160.html)
offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module {
return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.locker, j.hk, j.logger)
return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.hk, j.logger)
},
// XEP-0012: Last Activity
// (https://xmpp.org/extensions/xep-0012.html)
Expand Down
18 changes: 3 additions & 15 deletions pkg/module/offline/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
"context"

c2smodel "github.com/ortuman/jackal/pkg/model/c2s"

"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/router"
"github.com/ortuman/jackal/pkg/storage/repository"
)

//go:generate moq -out repository.mock_test.go . offlineRepository:repositoryMock
type offlineRepository interface {
repository.Offline
//go:generate moq -out repository.mock_test.go . globalRepository:repositoryMock
type globalRepository interface {
repository.Repository
}

//go:generate moq -out router.mock_test.go . globalRouter:routerMock
Expand All @@ -39,16 +37,6 @@ type hosts interface {
IsLocalHost(h string) bool
}

//go:generate moq -out locker.mock_test.go . clusterLocker:lockerMock
type clusterLocker interface {
locker.Locker
}

//go:generate moq -out lock.mock_test.go . clusterLock:lockMock
type clusterLock interface {
locker.Lock
}

//go:generate moq -out resourcemanager.mock_test.go . resourceManager
type resourceManager interface {
GetResources(ctx context.Context, username string) ([]c2smodel.ResourceDesc, error)
Expand Down
31 changes: 15 additions & 16 deletions pkg/module/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/jackal-xmpp/stravaganza/v2"
stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza"
"github.com/ortuman/jackal/pkg/c2s"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/hook"
"github.com/ortuman/jackal/pkg/host"
"github.com/ortuman/jackal/pkg/router"
Expand Down Expand Up @@ -53,8 +52,7 @@ type Offline struct {
hosts hosts
router router.Router
resMng resourceManager
rep repository.Offline
locker locker.Locker
rep repository.Repository
hk *hook.Hooks
logger kitlog.Logger
}
Expand All @@ -65,8 +63,7 @@ func New(
router router.Router,
hosts *host.Hosts,
resMng *c2s.ResourceManager,
rep repository.Offline,
locker locker.Locker,
rep repository.Repository,
hk *hook.Hooks,
logger kitlog.Logger,
) *Offline {
Expand All @@ -76,7 +73,6 @@ func New(
hosts: hosts,
resMng: resMng,
rep: rep,
locker: locker,
hk: hk,
logger: kitlog.With(logger, "module", ModuleName),
}
Expand Down Expand Up @@ -166,21 +162,23 @@ func (m *Offline) onC2SPresenceRecv(ctx context.Context, execCtx *hook.Execution
func (m *Offline) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionContext) error {
inf := execCtx.Info.(*hook.UserInfo)

lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(inf.Username))
if err != nil {
lockID := offlineQueueLockID(inf.Username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

return m.rep.DeleteOfflineMessages(ctx, inf.Username)
}

func (m *Offline) deliverOfflineMessages(ctx context.Context, username string) error {
lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username))
if err != nil {
lockID := offlineQueueLockID(username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

ms, err := m.rep.FetchOfflineMessages(ctx, username)
if err != nil {
Expand All @@ -206,11 +204,12 @@ func (m *Offline) archiveMessage(ctx context.Context, msg *stravaganza.Message)
toJID := msg.ToJID()
username := toJID.Node()

lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username))
if err != nil {
lockID := offlineQueueLockID(username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

qSize, err := m.rep.CountOfflineMessages(ctx, username)
if err != nil {
Expand Down Expand Up @@ -253,5 +252,5 @@ func isMessageArchievable(msg *stravaganza.Message) bool {
}

func offlineQueueLockID(username string) string {
return fmt.Sprintf("offline:queue:%s", username)
return fmt.Sprintf("offline:lock:%s", username)
}
37 changes: 9 additions & 28 deletions pkg/module/offline/offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
kitlog "github.com/go-kit/log"
"github.com/jackal-xmpp/stravaganza/v2"
"github.com/jackal-xmpp/stravaganza/v2/jid"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/hook"
c2smodel "github.com/ortuman/jackal/pkg/model/c2s"
xmpputil "github.com/ortuman/jackal/pkg/util/xmpp"
Expand All @@ -31,15 +30,10 @@ import (

func TestOffline_ArchiveOfflineMessage(t *testing.T) {
// given
lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 0, nil
}
Expand All @@ -59,7 +53,6 @@ func TestOffline_ArchiveOfflineMessage(t *testing.T) {
hosts: hostsMock,
resMng: resManagerMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down Expand Up @@ -100,15 +93,10 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) {
hostsMock := &hostsMock{}
hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" }

lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 100, nil
}
Expand All @@ -127,7 +115,6 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) {
hosts: hostsMock,
resMng: resManagerMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down Expand Up @@ -173,15 +160,10 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) {
hostsMock := &hostsMock{}
hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" }

lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 1, nil
}
Expand All @@ -208,7 +190,6 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) {
router: routerMock,
hosts: hostsMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down
Loading

0 comments on commit 7097fb4

Please sign in to comment.