Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Use PgSQL locker implementation #201

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
## jackal - main / unreleased

* [ENHANCEMENT] Added memory ballast. #198
* [CHANGE] Introduced measured repository transaction type. #200
* [CHANGE] Introduced measured repository transaction type. #200
* [CHANGE] Use PgSQL locker. #201
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
83 changes: 0 additions & 83 deletions pkg/cluster/etcd/locker.go

This file was deleted.

11 changes: 1 addition & 10 deletions pkg/jackal/jackal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager"
"github.com/ortuman/jackal/pkg/cluster/etcd"
"github.com/ortuman/jackal/pkg/cluster/kv"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/cluster/memberlist"
clusterrouter "github.com/ortuman/jackal/pkg/cluster/router"
clusterserver "github.com/ortuman/jackal/pkg/cluster/server"
Expand Down Expand Up @@ -105,8 +104,7 @@ type Jackal struct {
peppers *pepper.Keys
hk *hook.Hooks

locker locker.Locker
kv kv.KV
kv kv.KV

rep repository.Repository
memberList *memberlist.MemberList
Expand Down Expand Up @@ -138,7 +136,6 @@ func New(output io.Writer, args []string) *Jackal {
output: output,
args: args,
waitStopCh: make(chan os.Signal, 1),
locker: locker.NewNopLocker(),
kv: kv.NewNopKV(),
}
}
Expand Down Expand Up @@ -223,7 +220,6 @@ func (j *Jackal) Run() error {
if err := j.checkEtcdHealth(cfg.Cluster.Etcd.Endpoints); err != nil {
return err
}
j.initLocker(cfg.Cluster.Etcd)
j.initKVStore(cfg.Cluster.Etcd)
}

Expand Down Expand Up @@ -307,11 +303,6 @@ func (j *Jackal) checkEtcdHealth(endpoints []string) error {
return nil
}

func (j *Jackal) initLocker(cfg etcd.Config) {
j.locker = etcd.NewLocker(cfg, j.logger)
j.registerStartStopper(j.locker)
}

func (j *Jackal) initKVStore(cfg etcd.Config) {
etcdKV := etcd.NewKV(cfg, j.logger)
j.kv = kv.NewMeasured(etcdKV)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jackal/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{
// Offline
// (https://xmpp.org/extensions/xep-0160.html)
offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module {
return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.locker, j.hk, j.logger)
return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.hk, j.logger)
},
// XEP-0012: Last Activity
// (https://xmpp.org/extensions/xep-0012.html)
Expand Down
18 changes: 3 additions & 15 deletions pkg/module/offline/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
"context"

c2smodel "github.com/ortuman/jackal/pkg/model/c2s"

"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/router"
"github.com/ortuman/jackal/pkg/storage/repository"
)

//go:generate moq -out repository.mock_test.go . offlineRepository:repositoryMock
type offlineRepository interface {
repository.Offline
//go:generate moq -out repository.mock_test.go . globalRepository:repositoryMock
type globalRepository interface {
repository.Repository
}

//go:generate moq -out router.mock_test.go . globalRouter:routerMock
Expand All @@ -39,16 +37,6 @@ type hosts interface {
IsLocalHost(h string) bool
}

//go:generate moq -out locker.mock_test.go . clusterLocker:lockerMock
type clusterLocker interface {
locker.Locker
}

//go:generate moq -out lock.mock_test.go . clusterLock:lockMock
type clusterLock interface {
locker.Lock
}

//go:generate moq -out resourcemanager.mock_test.go . resourceManager
type resourceManager interface {
GetResources(ctx context.Context, username string) ([]c2smodel.ResourceDesc, error)
Expand Down
31 changes: 15 additions & 16 deletions pkg/module/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/jackal-xmpp/stravaganza/v2"
stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza"
"github.com/ortuman/jackal/pkg/c2s"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/hook"
"github.com/ortuman/jackal/pkg/host"
"github.com/ortuman/jackal/pkg/router"
Expand Down Expand Up @@ -53,8 +52,7 @@ type Offline struct {
hosts hosts
router router.Router
resMng resourceManager
rep repository.Offline
locker locker.Locker
rep repository.Repository
hk *hook.Hooks
logger kitlog.Logger
}
Expand All @@ -65,8 +63,7 @@ func New(
router router.Router,
hosts *host.Hosts,
resMng *c2s.ResourceManager,
rep repository.Offline,
locker locker.Locker,
rep repository.Repository,
hk *hook.Hooks,
logger kitlog.Logger,
) *Offline {
Expand All @@ -76,7 +73,6 @@ func New(
hosts: hosts,
resMng: resMng,
rep: rep,
locker: locker,
hk: hk,
logger: kitlog.With(logger, "module", ModuleName),
}
Expand Down Expand Up @@ -166,21 +162,23 @@ func (m *Offline) onC2SPresenceRecv(ctx context.Context, execCtx *hook.Execution
func (m *Offline) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionContext) error {
inf := execCtx.Info.(*hook.UserInfo)

lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(inf.Username))
if err != nil {
lockID := offlineQueueLockID(inf.Username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

return m.rep.DeleteOfflineMessages(ctx, inf.Username)
}

func (m *Offline) deliverOfflineMessages(ctx context.Context, username string) error {
lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username))
if err != nil {
lockID := offlineQueueLockID(username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

ms, err := m.rep.FetchOfflineMessages(ctx, username)
if err != nil {
Expand All @@ -206,11 +204,12 @@ func (m *Offline) archiveMessage(ctx context.Context, msg *stravaganza.Message)
toJID := msg.ToJID()
username := toJID.Node()

lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username))
if err != nil {
lockID := offlineQueueLockID(username)

if err := m.rep.Lock(ctx, lockID); err != nil {
return err
}
defer func() { _ = lock.Release(ctx) }()
defer func() { _ = m.rep.Unlock(ctx, lockID) }()

qSize, err := m.rep.CountOfflineMessages(ctx, username)
if err != nil {
Expand Down Expand Up @@ -253,5 +252,5 @@ func isMessageArchievable(msg *stravaganza.Message) bool {
}

func offlineQueueLockID(username string) string {
return fmt.Sprintf("offline:queue:%s", username)
return fmt.Sprintf("offline:lock:%s", username)
}
37 changes: 9 additions & 28 deletions pkg/module/offline/offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
kitlog "github.com/go-kit/log"
"github.com/jackal-xmpp/stravaganza/v2"
"github.com/jackal-xmpp/stravaganza/v2/jid"
"github.com/ortuman/jackal/pkg/cluster/locker"
"github.com/ortuman/jackal/pkg/hook"
c2smodel "github.com/ortuman/jackal/pkg/model/c2s"
xmpputil "github.com/ortuman/jackal/pkg/util/xmpp"
Expand All @@ -31,15 +30,10 @@ import (

func TestOffline_ArchiveOfflineMessage(t *testing.T) {
// given
lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 0, nil
}
Expand All @@ -59,7 +53,6 @@ func TestOffline_ArchiveOfflineMessage(t *testing.T) {
hosts: hostsMock,
resMng: resManagerMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down Expand Up @@ -100,15 +93,10 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) {
hostsMock := &hostsMock{}
hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" }

lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 100, nil
}
Expand All @@ -127,7 +115,6 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) {
hosts: hostsMock,
resMng: resManagerMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down Expand Up @@ -173,15 +160,10 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) {
hostsMock := &hostsMock{}
hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" }

lockMock := &lockMock{}
lockMock.ReleaseFunc = func(ctx context.Context) error {
return nil
}
lockerMock := &lockerMock{}
lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) {
return lockMock, nil
}
repMock := &repositoryMock{}
repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil }
repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil }

repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) {
return 1, nil
}
Expand All @@ -208,7 +190,6 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) {
router: routerMock,
hosts: hostsMock,
rep: repMock,
locker: lockerMock,
hk: hk,
logger: kitlog.NewNopLogger(),
}
Expand Down
Loading