From 7097fb4fae90b62d1095de3478cb3a938263a13a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Wed, 26 Jan 2022 10:37:06 +0100 Subject: [PATCH] Use PgSQL locker implementation (#201) * use PgSQL locker implementation * updated CHANGELOG.md --- CHANGELOG.md | 3 +- pkg/cluster/etcd/locker.go | 83 ------------------- pkg/jackal/jackal.go | 11 +-- pkg/jackal/modules.go | 2 +- pkg/module/offline/interface.go | 18 +--- pkg/module/offline/offline.go | 31 ++++--- pkg/module/offline/offline_test.go | 37 ++------- .../measured/locker.go} | 38 +++++---- pkg/storage/measured/locker_test.go | 52 ++++++++++++ pkg/storage/measured/measured.go | 3 + pkg/storage/measured/tx.go | 2 + pkg/storage/pgsql/locker.go | 52 ++++++++++++ pkg/storage/pgsql/locker_test.go | 62 ++++++++++++++ pkg/storage/pgsql/repository.go | 2 + pkg/storage/pgsql/tx.go | 2 + .../locker => storage/repository}/locker.go | 23 ++--- pkg/storage/repository/repository.go | 1 + 17 files changed, 234 insertions(+), 188 deletions(-) delete mode 100644 pkg/cluster/etcd/locker.go rename pkg/{cluster/locker/nop_locker.go => storage/measured/locker.go} (50%) create mode 100644 pkg/storage/measured/locker_test.go create mode 100644 pkg/storage/pgsql/locker.go create mode 100644 pkg/storage/pgsql/locker_test.go rename pkg/{cluster/locker => storage/repository}/locker.go (54%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 951ece074..2c432a131 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,5 +3,6 @@ ## jackal - main / unreleased * [ENHANCEMENT] Added memory ballast. #198 -* [CHANGE] Introduced measured repository transaction type. #200 +* [CHANGE] Introduced measured repository transaction type. #200 +* [CHANGE] Use PgSQL locker. #201 * [BUGFIX] Fix S2S db key check when nop KV is used. #199 diff --git a/pkg/cluster/etcd/locker.go b/pkg/cluster/etcd/locker.go deleted file mode 100644 index 521de0e9a..000000000 --- a/pkg/cluster/etcd/locker.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2020 The jackal Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcd - -import ( - "context" - - kitlog "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/ortuman/jackal/pkg/cluster/locker" - etcdv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" -) - -type etcdLock struct { - mu *concurrency.Mutex -} - -func (m *etcdLock) Release(ctx context.Context) error { return m.mu.Unlock(ctx) } - -// Locker defines etcd locker.Locker implementation. -type Locker struct { - cfg Config - logger kitlog.Logger - cli *etcdv3.Client - ss *concurrency.Session -} - -// NewLocker returns a new initialized etcd locker. -func NewLocker(cfg Config, logger kitlog.Logger) *Locker { - return &Locker{cfg: cfg, logger: logger} -} - -// AcquireLock acquires and returns an etcd locker. -func (l *Locker) AcquireLock(ctx context.Context, lockID string) (locker.Lock, error) { - mu := concurrency.NewMutex(l.ss, lockID) - if err := mu.Lock(ctx); err != nil { - return nil, err - } - return &etcdLock{mu: mu}, nil -} - -// Start starts etcd locker. -func (l *Locker) Start(_ context.Context) error { - // perform dialing - cli, err := dial(l.cfg) - if err != nil { - return err - } - l.cli = cli - - ss, err := concurrency.NewSession(l.cli) - if err != nil { - return err - } - l.ss = ss - level.Info(l.logger).Log("msg", "started etcd locker") - return nil -} - -// Stop stops etcd locker. -func (l *Locker) Stop(_ context.Context) error { - if err := l.ss.Close(); err != nil { - return err - } - if err := l.cli.Close(); err != nil { - return err - } - level.Info(l.logger).Log("msg", "stopped etcd locker") - return nil -} diff --git a/pkg/jackal/jackal.go b/pkg/jackal/jackal.go index e6bfc5081..8ad1ba6ff 100644 --- a/pkg/jackal/jackal.go +++ b/pkg/jackal/jackal.go @@ -38,7 +38,6 @@ import ( clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager" "github.com/ortuman/jackal/pkg/cluster/etcd" "github.com/ortuman/jackal/pkg/cluster/kv" - "github.com/ortuman/jackal/pkg/cluster/locker" "github.com/ortuman/jackal/pkg/cluster/memberlist" clusterrouter "github.com/ortuman/jackal/pkg/cluster/router" clusterserver "github.com/ortuman/jackal/pkg/cluster/server" @@ -105,8 +104,7 @@ type Jackal struct { peppers *pepper.Keys hk *hook.Hooks - locker locker.Locker - kv kv.KV + kv kv.KV rep repository.Repository memberList *memberlist.MemberList @@ -138,7 +136,6 @@ func New(output io.Writer, args []string) *Jackal { output: output, args: args, waitStopCh: make(chan os.Signal, 1), - locker: locker.NewNopLocker(), kv: kv.NewNopKV(), } } @@ -223,7 +220,6 @@ func (j *Jackal) Run() error { if err := j.checkEtcdHealth(cfg.Cluster.Etcd.Endpoints); err != nil { return err } - j.initLocker(cfg.Cluster.Etcd) j.initKVStore(cfg.Cluster.Etcd) } @@ -307,11 +303,6 @@ func (j *Jackal) checkEtcdHealth(endpoints []string) error { return nil } -func (j *Jackal) initLocker(cfg etcd.Config) { - j.locker = etcd.NewLocker(cfg, j.logger) - j.registerStartStopper(j.locker) -} - func (j *Jackal) initKVStore(cfg etcd.Config) { etcdKV := etcd.NewKV(cfg, j.logger) j.kv = kv.NewMeasured(etcdKV) diff --git a/pkg/jackal/modules.go b/pkg/jackal/modules.go index f88b96772..d33892812 100644 --- a/pkg/jackal/modules.go +++ b/pkg/jackal/modules.go @@ -55,7 +55,7 @@ var modFns = map[string]func(a *Jackal, cfg *ModulesConfig) module.Module{ // Offline // (https://xmpp.org/extensions/xep-0160.html) offline.ModuleName: func(j *Jackal, cfg *ModulesConfig) module.Module { - return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.locker, j.hk, j.logger) + return offline.New(cfg.Offline, j.router, j.hosts, j.resMng, j.rep, j.hk, j.logger) }, // XEP-0012: Last Activity // (https://xmpp.org/extensions/xep-0012.html) diff --git a/pkg/module/offline/interface.go b/pkg/module/offline/interface.go index 73cd301d6..1936660ce 100644 --- a/pkg/module/offline/interface.go +++ b/pkg/module/offline/interface.go @@ -18,15 +18,13 @@ import ( "context" c2smodel "github.com/ortuman/jackal/pkg/model/c2s" - - "github.com/ortuman/jackal/pkg/cluster/locker" "github.com/ortuman/jackal/pkg/router" "github.com/ortuman/jackal/pkg/storage/repository" ) -//go:generate moq -out repository.mock_test.go . offlineRepository:repositoryMock -type offlineRepository interface { - repository.Offline +//go:generate moq -out repository.mock_test.go . globalRepository:repositoryMock +type globalRepository interface { + repository.Repository } //go:generate moq -out router.mock_test.go . globalRouter:routerMock @@ -39,16 +37,6 @@ type hosts interface { IsLocalHost(h string) bool } -//go:generate moq -out locker.mock_test.go . clusterLocker:lockerMock -type clusterLocker interface { - locker.Locker -} - -//go:generate moq -out lock.mock_test.go . clusterLock:lockMock -type clusterLock interface { - locker.Lock -} - //go:generate moq -out resourcemanager.mock_test.go . resourceManager type resourceManager interface { GetResources(ctx context.Context, username string) ([]c2smodel.ResourceDesc, error) diff --git a/pkg/module/offline/offline.go b/pkg/module/offline/offline.go index abc9b2096..6632532f0 100644 --- a/pkg/module/offline/offline.go +++ b/pkg/module/offline/offline.go @@ -24,7 +24,6 @@ import ( "github.com/jackal-xmpp/stravaganza/v2" stanzaerror "github.com/jackal-xmpp/stravaganza/v2/errors/stanza" "github.com/ortuman/jackal/pkg/c2s" - "github.com/ortuman/jackal/pkg/cluster/locker" "github.com/ortuman/jackal/pkg/hook" "github.com/ortuman/jackal/pkg/host" "github.com/ortuman/jackal/pkg/router" @@ -53,8 +52,7 @@ type Offline struct { hosts hosts router router.Router resMng resourceManager - rep repository.Offline - locker locker.Locker + rep repository.Repository hk *hook.Hooks logger kitlog.Logger } @@ -65,8 +63,7 @@ func New( router router.Router, hosts *host.Hosts, resMng *c2s.ResourceManager, - rep repository.Offline, - locker locker.Locker, + rep repository.Repository, hk *hook.Hooks, logger kitlog.Logger, ) *Offline { @@ -76,7 +73,6 @@ func New( hosts: hosts, resMng: resMng, rep: rep, - locker: locker, hk: hk, logger: kitlog.With(logger, "module", ModuleName), } @@ -166,21 +162,23 @@ func (m *Offline) onC2SPresenceRecv(ctx context.Context, execCtx *hook.Execution func (m *Offline) onUserDeleted(ctx context.Context, execCtx *hook.ExecutionContext) error { inf := execCtx.Info.(*hook.UserInfo) - lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(inf.Username)) - if err != nil { + lockID := offlineQueueLockID(inf.Username) + + if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = lock.Release(ctx) }() + defer func() { _ = m.rep.Unlock(ctx, lockID) }() return m.rep.DeleteOfflineMessages(ctx, inf.Username) } func (m *Offline) deliverOfflineMessages(ctx context.Context, username string) error { - lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username)) - if err != nil { + lockID := offlineQueueLockID(username) + + if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = lock.Release(ctx) }() + defer func() { _ = m.rep.Unlock(ctx, lockID) }() ms, err := m.rep.FetchOfflineMessages(ctx, username) if err != nil { @@ -206,11 +204,12 @@ func (m *Offline) archiveMessage(ctx context.Context, msg *stravaganza.Message) toJID := msg.ToJID() username := toJID.Node() - lock, err := m.locker.AcquireLock(ctx, offlineQueueLockID(username)) - if err != nil { + lockID := offlineQueueLockID(username) + + if err := m.rep.Lock(ctx, lockID); err != nil { return err } - defer func() { _ = lock.Release(ctx) }() + defer func() { _ = m.rep.Unlock(ctx, lockID) }() qSize, err := m.rep.CountOfflineMessages(ctx, username) if err != nil { @@ -253,5 +252,5 @@ func isMessageArchievable(msg *stravaganza.Message) bool { } func offlineQueueLockID(username string) string { - return fmt.Sprintf("offline:queue:%s", username) + return fmt.Sprintf("offline:lock:%s", username) } diff --git a/pkg/module/offline/offline_test.go b/pkg/module/offline/offline_test.go index ddd45007f..1d086cc60 100644 --- a/pkg/module/offline/offline_test.go +++ b/pkg/module/offline/offline_test.go @@ -22,7 +22,6 @@ import ( kitlog "github.com/go-kit/log" "github.com/jackal-xmpp/stravaganza/v2" "github.com/jackal-xmpp/stravaganza/v2/jid" - "github.com/ortuman/jackal/pkg/cluster/locker" "github.com/ortuman/jackal/pkg/hook" c2smodel "github.com/ortuman/jackal/pkg/model/c2s" xmpputil "github.com/ortuman/jackal/pkg/util/xmpp" @@ -31,15 +30,10 @@ import ( func TestOffline_ArchiveOfflineMessage(t *testing.T) { // given - lockMock := &lockMock{} - lockMock.ReleaseFunc = func(ctx context.Context) error { - return nil - } - lockerMock := &lockerMock{} - lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) { - return lockMock, nil - } repMock := &repositoryMock{} + repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) { return 0, nil } @@ -59,7 +53,6 @@ func TestOffline_ArchiveOfflineMessage(t *testing.T) { hosts: hostsMock, resMng: resManagerMock, rep: repMock, - locker: lockerMock, hk: hk, logger: kitlog.NewNopLogger(), } @@ -100,15 +93,10 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) { hostsMock := &hostsMock{} hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" } - lockMock := &lockMock{} - lockMock.ReleaseFunc = func(ctx context.Context) error { - return nil - } - lockerMock := &lockerMock{} - lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) { - return lockMock, nil - } repMock := &repositoryMock{} + repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) { return 100, nil } @@ -127,7 +115,6 @@ func TestOffline_ArchiveOfflineMessageQueueFull(t *testing.T) { hosts: hostsMock, resMng: resManagerMock, rep: repMock, - locker: lockerMock, hk: hk, logger: kitlog.NewNopLogger(), } @@ -173,15 +160,10 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) { hostsMock := &hostsMock{} hostsMock.IsLocalHostFunc = func(h string) bool { return h == "jackal.im" } - lockMock := &lockMock{} - lockMock.ReleaseFunc = func(ctx context.Context) error { - return nil - } - lockerMock := &lockerMock{} - lockerMock.AcquireLockFunc = func(ctx context.Context, lockID string) (locker.Lock, error) { - return lockMock, nil - } repMock := &repositoryMock{} + repMock.LockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.UnlockFunc = func(ctx context.Context, lockID string) error { return nil } + repMock.CountOfflineMessagesFunc = func(ctx context.Context, username string) (int, error) { return 1, nil } @@ -208,7 +190,6 @@ func TestOffline_DeliverOfflineMessages(t *testing.T) { router: routerMock, hosts: hostsMock, rep: repMock, - locker: lockerMock, hk: hk, logger: kitlog.NewNopLogger(), } diff --git a/pkg/cluster/locker/nop_locker.go b/pkg/storage/measured/locker.go similarity index 50% rename from pkg/cluster/locker/nop_locker.go rename to pkg/storage/measured/locker.go index 0a96c9c3e..29d464b1b 100644 --- a/pkg/cluster/locker/nop_locker.go +++ b/pkg/storage/measured/locker.go @@ -12,28 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -package locker +package measuredrepository -import "context" +import ( + "context" + "time" -// NewNopLocker returns a Locker that doesn't do anything. -func NewNopLocker() Locker { return &nopLocker{} } + "github.com/ortuman/jackal/pkg/storage/repository" +) -// IsNop tells whether l is nop locker. -func IsNop(l Locker) bool { - _, ok := l.(*nopLocker) - return ok +type measuredLocker struct { + rep repository.Locker + inTx bool } -type nopLocker struct{} - -func (l *nopLocker) AcquireLock(_ context.Context, _ string) (Lock, error) { - return &nopLock{}, nil +func (m *measuredLocker) Lock(ctx context.Context, lockID string) error { + t0 := time.Now() + err := m.rep.Lock(ctx, lockID) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return err } -func (l *nopLocker) Start(_ context.Context) error { return nil } -func (l *nopLocker) Stop(_ context.Context) error { return nil } - -type nopLock struct{} - -func (l *nopLock) Release(_ context.Context) error { return nil } +func (m *measuredLocker) Unlock(ctx context.Context, lockID string) error { + t0 := time.Now() + err := m.rep.Unlock(ctx, lockID) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) + return err +} diff --git a/pkg/storage/measured/locker_test.go b/pkg/storage/measured/locker_test.go new file mode 100644 index 000000000..4dda540d0 --- /dev/null +++ b/pkg/storage/measured/locker_test.go @@ -0,0 +1,52 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package measuredrepository + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMeasuredLocker_Lock(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.LockFunc = func(ctx context.Context, lockID string) error { + return nil + } + m := &measuredLocker{rep: repMock} + + // when + _ = m.Lock(context.Background(), "l1") + + // then + require.Len(t, repMock.LockCalls(), 1) +} + +func TestMeasuredLocker_Unlock(t *testing.T) { + // given + repMock := &repositoryMock{} + repMock.UnlockFunc = func(ctx context.Context, lockID string) error { + return nil + } + m := &measuredLocker{rep: repMock} + + // when + _ = m.Unlock(context.Background(), "l1") + + // then + require.Len(t, repMock.UnlockCalls(), 1) +} diff --git a/pkg/storage/measured/measured.go b/pkg/storage/measured/measured.go index 5f5404085..628b6d2e0 100644 --- a/pkg/storage/measured/measured.go +++ b/pkg/storage/measured/measured.go @@ -27,6 +27,7 @@ const ( upsertOp = "upsert" fetchOp = "fetch" deleteOp = "delete" + lockOp = "lock" ) // Measured is measured Repository implementation. @@ -39,6 +40,7 @@ type Measured struct { measuredPrivateRep measuredRosterRep measuredVCardRep + measuredLocker rep repository.Repository } @@ -53,6 +55,7 @@ func New(rep repository.Repository) repository.Repository { measuredPrivateRep: measuredPrivateRep{rep: rep}, measuredRosterRep: measuredRosterRep{rep: rep}, measuredVCardRep: measuredVCardRep{rep: rep}, + measuredLocker: measuredLocker{rep: rep}, rep: rep, } } diff --git a/pkg/storage/measured/tx.go b/pkg/storage/measured/tx.go index e53571029..6f90241ed 100644 --- a/pkg/storage/measured/tx.go +++ b/pkg/storage/measured/tx.go @@ -25,6 +25,7 @@ type measuredTx struct { repository.Private repository.Roster repository.VCard + repository.Locker } func newMeasuredTx(tx repository.Transaction) *measuredTx { @@ -37,5 +38,6 @@ func newMeasuredTx(tx repository.Transaction) *measuredTx { Private: &measuredPrivateRep{rep: tx, inTx: true}, Roster: &measuredRosterRep{rep: tx, inTx: true}, VCard: &measuredVCardRep{rep: tx, inTx: true}, + Locker: &measuredLocker{rep: tx, inTx: true}, } } diff --git a/pkg/storage/pgsql/locker.go b/pkg/storage/pgsql/locker.go new file mode 100644 index 000000000..68f44e79e --- /dev/null +++ b/pkg/storage/pgsql/locker.go @@ -0,0 +1,52 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsqlrepository + +import ( + "context" + "time" +) + +const waitForLockDelay = time.Millisecond * 10 + +type pgSQLLocker struct { + conn conn +} + +func (l *pgSQLLocker) Lock(ctx context.Context, lockID string) error { + for { + if err := ctx.Err(); err != nil { + return err + } + var acquired bool + + err := l.conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(hashtext($1))", lockID).Scan(&acquired) + switch err { + case nil: + if acquired { + return nil + } + time.Sleep(waitForLockDelay) // wait and retry + + default: + return err + } + } +} + +func (l *pgSQLLocker) Unlock(ctx context.Context, lockID string) error { + _, err := l.conn.ExecContext(ctx, "SELECT pg_advisory_unlock(hashtext($1))", lockID) + return err +} diff --git a/pkg/storage/pgsql/locker_test.go b/pkg/storage/pgsql/locker_test.go new file mode 100644 index 000000000..acf69816c --- /dev/null +++ b/pkg/storage/pgsql/locker_test.go @@ -0,0 +1,62 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsqlrepository + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" +) + +func TestPgSQLLocker_Lock(t *testing.T) { + // given + s, mock := newLockerMock() + + mock.ExpectQuery(`SELECT pg_try_advisory_lock\(hashtext\(\$1\)\)`). + WithArgs("l1"). + WillReturnRows( + sqlmock.NewRows([]string{"pg_try_advisory_lock"}).AddRow(true), + ) + + // when + err := s.Lock(context.Background(), "l1") + + // then + require.Nil(t, mock.ExpectationsWereMet()) + require.Nil(t, err) +} + +func TestPgSQLLocker_Unlock(t *testing.T) { + // given + s, mock := newLockerMock() + + mock.ExpectExec(`SELECT pg_advisory_unlock\(hashtext\(\$1\)\)`). + WithArgs("l1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // when + err := s.Unlock(context.Background(), "l1") + + // then + require.Nil(t, mock.ExpectationsWereMet()) + require.Nil(t, err) +} + +func newLockerMock() (*pgSQLLocker, sqlmock.Sqlmock) { + s, sqlMock := newPgSQLMock() + return &pgSQLLocker{conn: s}, sqlMock +} diff --git a/pkg/storage/pgsql/repository.go b/pkg/storage/pgsql/repository.go index a822f4550..e2533605f 100644 --- a/pkg/storage/pgsql/repository.go +++ b/pkg/storage/pgsql/repository.go @@ -55,6 +55,7 @@ type Repository struct { repository.Private repository.Roster repository.VCard + repository.Locker host string dsn string @@ -117,6 +118,7 @@ func (r *Repository) Start(ctx context.Context) error { r.Private = &pgSQLPrivateRep{conn: db, logger: r.logger} r.Roster = &pgSQLRosterRep{conn: db, logger: r.logger} r.VCard = &pgSQLVCardRep{conn: db, logger: r.logger} + r.Locker = &pgSQLLocker{conn: db} return nil } diff --git a/pkg/storage/pgsql/tx.go b/pkg/storage/pgsql/tx.go index cc14cd84e..973aa45bb 100644 --- a/pkg/storage/pgsql/tx.go +++ b/pkg/storage/pgsql/tx.go @@ -29,6 +29,7 @@ type repTx struct { repository.Private repository.Roster repository.VCard + repository.Locker } func newRepTx(tx *sql.Tx) *repTx { @@ -41,5 +42,6 @@ func newRepTx(tx *sql.Tx) *repTx { Private: &pgSQLPrivateRep{conn: tx}, Roster: &pgSQLRosterRep{conn: tx}, VCard: &pgSQLVCardRep{conn: tx}, + Locker: &pgSQLLocker{conn: tx}, } } diff --git a/pkg/cluster/locker/locker.go b/pkg/storage/repository/locker.go similarity index 54% rename from pkg/cluster/locker/locker.go rename to pkg/storage/repository/locker.go index 2768fe913..4bdab73e0 100644 --- a/pkg/cluster/locker/locker.go +++ b/pkg/storage/repository/locker.go @@ -1,4 +1,4 @@ -// Copyright 2020 The jackal Authors +// Copyright 2021 The jackal Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,24 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package locker +package repository import "context" -// Locker defines distributed locking interface. +// Locker defines repository locker interface. type Locker interface { - // AcquireLock acquires lockID distributed lock. - AcquireLock(ctx context.Context, lockID string) (Lock, error) + // Lock obtains an exclusive lock. + Lock(ctx context.Context, lockID string) error - // Start initializes locker. - Start(ctx context.Context) error - - // Stop releases all locker underlying resources. - Stop(ctx context.Context) error -} - -// Lock defines distributed lock object. -type Lock interface { - // Release releases a previously acquired lock. - Release(ctx context.Context) error + // Unlock releases an exclusive lock. + Unlock(ctx context.Context, lockID string) error } diff --git a/pkg/storage/repository/repository.go b/pkg/storage/repository/repository.go index 5960849c3..0c640a2c6 100644 --- a/pkg/storage/repository/repository.go +++ b/pkg/storage/repository/repository.go @@ -46,4 +46,5 @@ type baseRepository interface { Private Roster VCard + Locker }