diff --git a/build/charts/yorkie-cluster/templates/deployment.yaml b/build/charts/yorkie-cluster/templates/deployment.yaml index f3d31f518..f40008693 100644 --- a/build/charts/yorkie-cluster/templates/deployment.yaml +++ b/build/charts/yorkie-cluster/templates/deployment.yaml @@ -41,6 +41,8 @@ spec: "--mongo-connection-uri", "mongodb://{{ .Values.yorkie.args.dbUrl }}:{{ .Values.yorkie.args.dbPort }}/yorkie-meta", "--enable-pprof", + "--housekeeping-leader-election", + "true", ] ports: - containerPort: {{ .Values.yorkie.ports.rpcPort }} diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 583188698..5bbb974c2 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -40,6 +40,7 @@ var ( adminTokenDuration time.Duration housekeepingInterval time.Duration + housekeepingLeaseDuration time.Duration clientDeactivateThreshold string mongoConnectionURI string @@ -74,6 +75,7 @@ func newServerCmd() *cobra.Command { conf.Backend.AuthWebhookCacheUnauthTTL = authWebhookCacheUnauthTTL.String() conf.Housekeeping.Interval = housekeepingInterval.String() + conf.Housekeeping.LeaseDuration = housekeepingLeaseDuration.String() if mongoConnectionURI != "" { conf.Mongo = &mongo.Config{ @@ -227,6 +229,18 @@ func init() { server.DefaultHousekeepingCandidatesLimitPerProject, "candidates limit per project for a single housekeeping run", ) + cmd.Flags().BoolVar( + &conf.Housekeeping.LeaderElection, + "housekeeping-leader-election", + server.DefaultHousekeepingLeaderElection, + "Enable leader election to run housekeeping only on the leader.", + ) + cmd.Flags().DurationVar( + &housekeepingLeaseDuration, + "housekeeping-lease-duration", + server.DefaultHousekeepingLeaseDuration, + "lease duration for a leader election in housekeeping", + ) cmd.Flags().StringVar( &mongoConnectionURI, "mongo-connection-uri", diff --git a/server/backend/backend.go b/server/backend/backend.go index 19c5b19b3..c4ce0626d 100644 --- a/server/backend/backend.go +++ b/server/backend/backend.go @@ -33,6 +33,8 @@ import ( "github.com/yorkie-team/yorkie/server/backend/database" memdb "github.com/yorkie-team/yorkie/server/backend/database/memory" "github.com/yorkie-team/yorkie/server/backend/database/mongo" + "github.com/yorkie-team/yorkie/server/backend/election" + mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo" "github.com/yorkie-team/yorkie/server/backend/housekeeping" "github.com/yorkie-team/yorkie/server/backend/sync" memsync "github.com/yorkie-team/yorkie/server/backend/sync/memory" @@ -48,6 +50,7 @@ type Backend struct { DB database.Database Coordinator sync.Coordinator + Elector election.Elector Metrics *prometheus.Metrics Background *background.Background Housekeeping *housekeeping.Housekeeping @@ -64,11 +67,12 @@ func New( ) (*Backend, error) { hostname := conf.Hostname if hostname == "" { - hostname, err := os.Hostname() + osHostname, err := os.Hostname() if err != nil { return nil, fmt.Errorf("os.Hostname: %w", err) } - conf.Hostname = hostname + conf.Hostname = osHostname + hostname = osHostname } serverInfo := &sync.ServerInfo{ @@ -103,10 +107,13 @@ func New( return nil, err } + elector := mongoelection.NewElector(hostname, db) + keeping, err := housekeeping.Start( housekeepingConf, db, coordinator, + elector, ) if err != nil { return nil, err @@ -118,8 +125,9 @@ func New( } logging.DefaultLogger().Infof( - "backend created: id: %s, rpc: %s", + "backend created: id: %s, rpc: %s, db: %s", serverInfo.ID, + serverInfo.Hostname, dbInfo, ) @@ -141,6 +149,7 @@ func New( Metrics: metrics, DB: db, Coordinator: coordinator, + Elector: elector, Housekeeping: keeping, AuthWebhookCache: authWebhookCache, @@ -155,6 +164,10 @@ func (b *Backend) Shutdown() error { return err } + if err := b.Elector.Stop(); err != nil { + return err + } + if err := b.Coordinator.Close(); err != nil { logging.DefaultLogger().Error(err) } diff --git a/server/backend/config.go b/server/backend/config.go index 038b861c9..d39ef28d3 100644 --- a/server/backend/config.go +++ b/server/backend/config.go @@ -42,7 +42,7 @@ type Config struct { // we are using server as single-tenant mode, this should be set to true. UseDefaultProject bool `yaml:"UseDefaultProject"` - // ClientDeactivateThreshold is deactivate threshold of clients in specific project for housekeeping. + // ClientDeactivateThreshold is deactivation threshold of clients in specific project for housekeeping. ClientDeactivateThreshold string `yaml:"ClientDeactivateThreshold"` // SnapshotThreshold is the threshold that determines if changes should be diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 2d4922fee..6fec4b234 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -20,6 +20,7 @@ package database import ( "context" "errors" + gotime "time" "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/pkg/document" @@ -253,4 +254,32 @@ type Database interface { docID types.ID, excludeClientID types.ID, ) (bool, error) + + // CreateTTLIndex creates a TTL index. + CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, + ) error + + // TryToAcquireLeaderLease tries to acquire the leader lease. + TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, + ) (bool, error) + + // RenewLeaderLease renews the leader lease. + RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, + ) error + + // FindLeader returns the leader hostname for the given leaseLockName. + FindLeader( + ctx context.Context, + leaseLockName string, + ) (*string, error) } diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index f77bdf7d6..6b7a50bfb 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -1308,6 +1308,40 @@ func (d *DB) findTicketByServerSeq( ), nil } +// CreateTTLIndex creates a TTL index. +func (d *DB) CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, +) error { + return nil +} + +// TryToAcquireLeaderLease tries to acquire the leader lease. +func (d *DB) TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) (bool, error) { + // In memory database, leader is always myself. + return true, nil +} + +// RenewLeaderLease renews the leader lease. +func (d *DB) RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) error { + return nil +} + +// FindLeader returns the leader hostname for the given leaseLockName. +func (d *DB) FindLeader(ctx context.Context, leaseLockName string) (*string, error) { + return nil, nil +} + func newID() types.ID { return types.ID(primitive.NewObjectID().Hex()) } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 25feb5e3e..609769e0e 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -1397,6 +1397,99 @@ func (c *Client) findTicketByServerSeq( ), nil } +// CreateTTLIndex creates a TTL index. +func (c *Client) CreateTTLIndex( + ctx context.Context, + leaseDuration gotime.Duration, +) error { + ttlIndexModel := mongo.IndexModel{ + Keys: bson.M{"lease_expire_at": 1}, + Options: options.Index().SetExpireAfterSeconds(int32(leaseDuration.Seconds())), + } + _, err := c.collection(colElections).Indexes().CreateOne(ctx, ttlIndexModel) + if err != nil { + return err + } + + return nil +} + +// TryToAcquireLeaderLease tries to acquire the leader lease. +func (c *Client) TryToAcquireLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) (bool, error) { + updated := false + result, err := c.collection(colElections).UpdateOne(ctx, bson.M{ + "election_id": leaseLockName, + "lease_expire_at": bson.M{"$lt": gotime.Now()}, + }, bson.M{ + "$set": bson.M{ + "leader_id": hostname, + "lease_expire_at": gotime.Now().Add(leaseDuration), + }}, + options.Update().SetUpsert(true), + ) + if err != nil { + return false, err + } + + if result.ModifiedCount == 1 || result.UpsertedCount == 1 { + updated = true + } + return updated, nil +} + +// RenewLeaderLease renews the leader lease. +func (c *Client) RenewLeaderLease( + ctx context.Context, + hostname string, + leaseLockName string, + leaseDuration gotime.Duration, +) error { + _, err := c.collection(colElections).UpdateOne(ctx, bson.M{ + "election_id": leaseLockName, + "leader_id": hostname, + }, bson.M{ + "$set": bson.M{ + "lease_expire_at": gotime.Now().Add(leaseDuration), + }}, + ) + if err != nil { + return err + } + + return nil +} + +// FindLeader returns the leader hostname for the given leaseLockName. +func (c *Client) FindLeader(ctx context.Context, leaseLockName string) (*string, error) { + electionInfo := &struct { + ElectionID string `bson:"election_id"` + LeaderID string `bson:"leader_id"` + LeaseExpireAt gotime.Time `bson:"lease_expire_at"` + }{} + + result := c.collection(colElections).FindOne(ctx, bson.M{ + "election_id": leaseLockName, + }) + if result.Err() == mongo.ErrNoDocuments { + return nil, nil + } + if result.Err() != nil { + logging.From(ctx).Error(result.Err()) + return nil, fmt.Errorf("find leader: %w", result.Err()) + } + + if err := result.Decode(&electionInfo); err != nil { + return nil, fmt.Errorf("decode leader: %w", err) + } + + return &electionInfo.LeaderID, nil +} + func (c *Client) collection( name string, opts ...*options.CollectionOptions, diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index ab1719d8d..bc6c0f4c7 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -17,7 +17,9 @@ package mongo_test import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" @@ -100,4 +102,35 @@ func TestClient(t *testing.T) { t.Run("IsDocumentAttached test", func(t *testing.T) { testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID) }) + + t.Run("leader lease acquisition test", func(t *testing.T) { + ctx := context.Background() + leaseDuration := 5 * time.Second + node1 := "node1" + node2 := "node2" + + err := cli.CreateTTLIndex(ctx, leaseDuration) + assert.NoError(t, err) + + // node 1 try to acquire leader lease + acquired, err := cli.TryToAcquireLeaderLease(ctx, node1, t.Name(), leaseDuration) + assert.NoError(t, err) + assert.True(t, acquired) + + // node 2 try to acquire leader lease, but it will fail + acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.Error(t, err) + assert.False(t, acquired) + + // after lease duration, node 2 can acquire leader lease + time.Sleep(leaseDuration) + + acquired, err = cli.TryToAcquireLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.NoError(t, err) + assert.True(t, acquired) + + // node 2 renew leader lease + err = cli.RenewLeaderLease(ctx, node2, t.Name(), leaseDuration) + assert.NoError(t, err) + }) } diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index 953659b3f..598653e38 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -33,6 +33,7 @@ const ( colChanges = "changes" colSnapshots = "snapshots" colSyncedSeqs = "syncedseqs" + colElections = "elections" ) type collectionInfo struct { @@ -131,6 +132,20 @@ var collectionInfos = []collectionInfo{ {Key: "actor_id", Value: bsonx.Int32(1)}, }, }}, + }, { + name: colElections, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "election_id", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }, { + Keys: bsonx.Doc{ + {Key: "election_id", Value: bsonx.Int32(1)}, + {Key: "leader_id", Value: bsonx.Int32(1)}, + {Key: "lease_expire_at", Value: bsonx.Int32(1)}, + }, + }}, }, } diff --git a/server/backend/election/election.go b/server/backend/election/election.go new file mode 100644 index 000000000..08a370966 --- /dev/null +++ b/server/backend/election/election.go @@ -0,0 +1,39 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package election provides leader election between server cluster. It is used to +// elect leader among server cluster and run tasks only on the leader. +package election + +import ( + "context" + "time" +) + +// Elector provides leader election between server cluster. It is used to +// elect leader among server cluster and run tasks only on the leader. +type Elector interface { + // StartElection starts leader election. + StartElection( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), + ) error + + // Stop stops all leader elections. + Stop() error +} diff --git a/server/backend/election/mongo/election.go b/server/backend/election/mongo/election.go new file mode 100644 index 000000000..237e6ebb6 --- /dev/null +++ b/server/backend/election/mongo/election.go @@ -0,0 +1,134 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package mongo is a mongo based implementation of election package. +package mongo + +import ( + "context" + "sync" + "time" + + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/logging" +) + +// Elector is a database-based implementation of election.Elector. +type Elector struct { + database database.Database + + hostname string + + ctx context.Context + cancelFunc context.CancelFunc + + wg sync.WaitGroup +} + +// NewElector creates a new elector instance. +func NewElector( + hostname string, + database database.Database, +) *Elector { + ctx, cancelFunc := context.WithCancel(context.Background()) + + return &Elector{ + database: database, + + hostname: hostname, + + ctx: ctx, + cancelFunc: cancelFunc, + } +} + +// StartElection starts leader election. +func (e *Elector) StartElection( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) error { + if err := e.database.CreateTTLIndex(context.Background(), leaseDuration); err != nil { + return err + } + + go e.run(leaseLockName, leaseDuration, onStartLeading, onStoppedLeading) + return nil +} + +// Stop stops all leader elections. +func (e *Elector) Stop() error { + e.cancelFunc() + e.wg.Wait() + + return nil +} + +// run starts leader election loop. +// run will not return before leader election loop is stopped by ctx, +// or it has stopped holding the leader lease +func (e *Elector) run( + leaseLockName string, + leaseDuration time.Duration, + onStartLeading func(ctx context.Context), + onStoppedLeading func(), +) { + for { + ctx, cancelFunc := context.WithCancel(e.ctx) + acquired, err := e.database.TryToAcquireLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + continue + } + + if acquired { + go func() { + e.wg.Add(1) + onStartLeading(ctx) + e.wg.Done() + }() + logging.From(ctx).Infof( + "leader elected: %s", e.hostname, + ) + + for { + err = e.database.RenewLeaderLease(ctx, e.hostname, leaseLockName, leaseDuration) + if err != nil { + break + } + + select { + case <-time.After(leaseDuration / 2): + case <-e.ctx.Done(): + cancelFunc() + return + } + } + } else { + onStoppedLeading() + logging.From(ctx).Infof( + "leader lost: %s", e.hostname, + ) + } + + select { + case <-time.After(leaseDuration): + case <-e.ctx.Done(): + cancelFunc() + return + } + } +} diff --git a/server/backend/election/mongo/election_test.go b/server/backend/election/mongo/election_test.go new file mode 100644 index 000000000..02c3693d0 --- /dev/null +++ b/server/backend/election/mongo/election_test.go @@ -0,0 +1,124 @@ +package mongo_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/server/backend/database/mongo" + mongoelection "github.com/yorkie-team/yorkie/server/backend/election/mongo" + "github.com/yorkie-team/yorkie/test/helper" +) + +var ( + normalTask = func(ctx context.Context) {} + stopTask = func() {} +) + +func setupTestWithDummyData(t *testing.T) *mongo.Client { + config := &mongo.Config{ + ConnectionTimeout: "5s", + ConnectionURI: "mongodb://localhost:27017", + YorkieDatabase: helper.TestDBName(), + PingTimeout: "5s", + } + assert.NoError(t, config.Validate()) + + db, err := mongo.Dial(config) + assert.NoError(t, err) + + return db +} + +func TestElection(t *testing.T) { + db := setupTestWithDummyData(t) + + t.Run("leader election with multiple electors test", func(t *testing.T) { + leaseLockName := t.Name() + + electorA := mongoelection.NewElector("A", db) + electorB := mongoelection.NewElector("B", db) + electorC := mongoelection.NewElector("C", db) + + assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + time.Sleep(helper.LeaseDuration) + + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, normalTask, stopTask)) + time.Sleep(helper.LeaseDuration) + + // elector A will be the leader because it is the first to start the election. + leader, err := db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "A", *leader) + + // wait for lease expiration and check the leader again + // elector A is still the leader because it has renewed the lease. + time.Sleep(helper.LeaseDuration) + assert.Equal(t, "A", *leader) + + // stop electorA and electorB, then wait for the next leader election + // elector C will be the leader because other electors are stopped. + assert.NoError(t, electorA.Stop()) + assert.NoError(t, electorB.Stop()) + + time.Sleep(helper.LeaseDuration) + + leader, err = db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "C", *leader) + }) + + t.Run("lease renewal while handling a a long task test", func(t *testing.T) { + leaseLockName := t.Name() + longTask := func(ctx context.Context) { + time.Sleep(helper.LeaseDuration * 4) + } + + electorA := mongoelection.NewElector("A", db) + electorB := mongoelection.NewElector("B", db) + electorC := mongoelection.NewElector("C", db) + + assert.NoError(t, electorA.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) + time.Sleep(helper.LeaseDuration) + + assert.NoError(t, electorB.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) + assert.NoError(t, electorC.StartElection(leaseLockName, helper.LeaseDuration, longTask, stopTask)) + + // wait for lease expiration and check if elector A is still the leader while handling a long task + time.Sleep(helper.LeaseDuration) + + leader, err := db.FindLeader(context.Background(), leaseLockName) + assert.NoError(t, err) + + assert.Equal(t, "A", *leader) + }) + + t.Run("handle background routines when shutting down the server test", func(t *testing.T) { + shutdownCh := make(chan struct{}) + + isTaskDone := false + longTask := func(ctx context.Context) { + close(shutdownCh) + time.Sleep(helper.LeaseDuration) + isTaskDone = true + } + + elector := mongoelection.NewElector("A", db) + assert.NoError(t, elector.StartElection(t.Name(), helper.LeaseDuration, longTask, stopTask)) + + // if receive shutdown signal, stop elector + select { + case <-shutdownCh: + assert.NoError(t, elector.Stop()) + } + + // check if the task is done + // this means that the background routine is handled properly after server(elector) is stopped + assert.Equal(t, true, isTaskDone) + }) +} diff --git a/server/backend/housekeeping/housekeeping.go b/server/backend/housekeeping/housekeeping.go index d08abcbdc..23bbeb196 100644 --- a/server/backend/housekeeping/housekeeping.go +++ b/server/backend/housekeeping/housekeeping.go @@ -25,6 +25,7 @@ import ( "time" "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/election" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/clients" "github.com/yorkie-team/yorkie/server/logging" @@ -32,6 +33,7 @@ import ( const ( deactivateCandidatesKey = "housekeeping/deactivateCandidates" + lockLeaseName = "housekeeping" ) // Config is the configuration for the housekeeping service. @@ -41,6 +43,12 @@ type Config struct { // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` + + // LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + LeaderElection bool `yaml:"LeaderElection"` + + // LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + LeaseDuration string `yaml:"LeaseDuration"` } // Validate validates the configuration. @@ -53,6 +61,14 @@ func (c *Config) Validate() error { ) } + if _, err := time.ParseDuration(c.LeaseDuration); err != nil { + return fmt.Errorf( + `invalid argument %s for "--housekeeping-lease-duration" flag: %w`, + c.LeaseDuration, + err, + ) + } + return nil } @@ -62,9 +78,12 @@ func (c *Config) Validate() error { type Housekeeping struct { database database.Database coordinator sync.Coordinator + elector election.Elector interval time.Duration candidatesLimitPerProject int + leaderElection bool + leaseDuration time.Duration ctx context.Context cancelFunc context.CancelFunc @@ -75,8 +94,9 @@ func Start( conf *Config, database database.Database, coordinator sync.Coordinator, + elector election.Elector, ) (*Housekeeping, error) { - h, err := New(conf, database, coordinator) + h, err := New(conf, database, coordinator, elector) if err != nil { return nil, err } @@ -92,20 +112,28 @@ func New( conf *Config, database database.Database, coordinator sync.Coordinator, + elector election.Elector, ) (*Housekeeping, error) { interval, err := time.ParseDuration(conf.Interval) if err != nil { return nil, fmt.Errorf("parse interval %s: %w", conf.Interval, err) } + leaseDuration, err := time.ParseDuration(conf.LeaseDuration) + if err != nil { + return nil, fmt.Errorf("parse lease duration %s: %w", conf.LeaseDuration, err) + } ctx, cancelFunc := context.WithCancel(context.Background()) return &Housekeeping{ database: database, coordinator: coordinator, + elector: elector, interval: interval, candidatesLimitPerProject: conf.CandidatesLimitPerProject, + leaderElection: conf.LeaderElection, + leaseDuration: leaseDuration, ctx: ctx, cancelFunc: cancelFunc, @@ -114,7 +142,20 @@ func New( // Start starts the housekeeping service. func (h *Housekeeping) Start() error { - go h.run() + if h.leaderElection { + err := h.elector.StartElection( + lockLeaseName, + h.leaseDuration, + func(ctx context.Context) { h.run() }, + func() { h.cancelFunc() }, + ) + if err != nil { + return err + } + } else { + go h.run() + } + return nil } diff --git a/server/config.go b/server/config.go index c875102af..98aa2f47a 100644 --- a/server/config.go +++ b/server/config.go @@ -42,6 +42,8 @@ const ( DefaultHousekeepingInterval = 30 * time.Second DefaultHousekeepingCandidatesLimitPerProject = 500 + DefaultHousekeepingLeaderElection = false + DefaultHousekeepingLeaseDuration = 60 * time.Second DefaultMongoConnectionURI = "mongodb://localhost:27017" DefaultMongoConnectionTimeout = 5 * time.Second @@ -231,6 +233,8 @@ func newConfig(port int, profilingPort int) *Config { Housekeeping: &housekeeping.Config{ Interval: DefaultHousekeepingInterval.String(), CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + LeaderElection: DefaultHousekeepingLeaderElection, + LeaseDuration: DefaultHousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ ClientDeactivateThreshold: DefaultClientDeactivateThreshold, diff --git a/server/config.sample.yml b/server/config.sample.yml index 4c48b7128..ff4dbf772 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -36,6 +36,12 @@ Housekeeping: # CandidatesLimitPerProject is the maximum number of candidates to be returned per project (default: 100). CandidatesLimitPerProject: 100 + # LeaderElection is the flag to enable leader election for performing housekeeping only on leader node. + LeaderElection: false + + # LeaseDuration is the duration that non-leader candidates will wait to force acquire leadership. + LeaseDuration: "15s" + # Backend is the configuration for the backend of Yorkie. Backend: # UseDefaultProject is whether to use the default project (default: true). @@ -43,7 +49,7 @@ Backend: # used. If we are using server as single-tenant mode, this should be set to true. UseDefaultProject: true - # ClientDeactivateThreshold is deactivate threshold of clients in specific project for housekeeping. + # ClientDeactivateThreshold is deactivation threshold of clients in specific project for housekeeping. ClientDeactivateThreshold: "24h" # SnapshotThreshold is the threshold that determines if changes should be diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index 36de756d4..b369e7d2c 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -85,6 +85,7 @@ func TestMain(m *testing.M) { }, &housekeeping.Config{ Interval: helper.HousekeepingInterval.String(), CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + LeaseDuration: helper.HousekeepingLeaseDuration.String(), }, met) if err != nil { log.Fatal(err) diff --git a/test/helper/helper.go b/test/helper/helper.go index b324d5f27..9fa799b49 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -59,6 +59,8 @@ var ( AdminPassword = server.DefaultAdminPassword HousekeepingInterval = 10 * gotime.Second HousekeepingCandidatesLimitPerProject = 10 + HousekeepingLeaderElection = false + HousekeepingLeaseDuration = 10 * gotime.Second AdminTokenDuration = "10s" ClientDeactivateThreshold = "10s" @@ -72,6 +74,8 @@ var ( MongoConnectionURI = "mongodb://localhost:27017" MongoConnectionTimeout = "5s" MongoPingTimeout = "5s" + + LeaseDuration = 2 * gotime.Second ) func init() { @@ -222,6 +226,8 @@ func TestConfig() *server.Config { Housekeeping: &housekeeping.Config{ Interval: HousekeepingInterval.String(), CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + LeaderElection: HousekeepingLeaderElection, + LeaseDuration: HousekeepingLeaseDuration.String(), }, Backend: &backend.Config{ AdminUser: server.DefaultAdminUser,